Python写入日志到Elasticsearch(logging/loguru,可个性化定制写入信息)————附带详细代码和示例

2023-05-16

文章目录

  • 0 结果
  • 1 准备
  • 2 调用
    • 2.1 调用方法1(使用logging)
    • 2.2 调用方法2(使用logging,并使用配置)
    • 2.3 调用方法3(使用loguru)
  • 3 添加或删除写入的es字段信息
    • 3.1 添加字段
    • 3.2 删除字段

0 结果

原始数据写入:
在这里插入图片描述

修改写入后的格式:

在这里插入图片描述

1 准备

这里使用的是Python自带的logging模块或loguru模块(封装了logging模块)进行es的日志写入。

使用如下的方法进行包安装(建议使用es的版本为8以下,以防出现找不到包的错误):

pip3 install "elasticsearch==7.9.1" -i  https://pypi.tuna.tsinghua.edu.cn/simple
pip3 install loguru -i  https://pypi.tuna.tsinghua.edu.cn/simple

这里使用修改后的CMRESHandler文件来进行es写入,修改了获取ip错误的后的代码如下:

#!/usr/bin/env python3

# 链接和初始化elasticsearch

import logging
import datetime
import socket
from threading import Timer, Lock
from enum import Enum
from elasticsearch import helpers as eshelpers
from elasticsearch import Elasticsearch, RequestsHttpConnection


# from CMRESSerializer import CMRESSerializer
# from getLocal_ip import get_local_ip
# import settings

from elasticsearch.serializer import JSONSerializer



class CMRESSerializer(JSONSerializer):
    def default(self, data):
        try:
            return super(CMRESSerializer, self).default(data)
        except TypeError:
            return str(data)


def get_local_ip():
    """
    获取本地IP

    :return:
    """

    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    except Exception as e:
        print(e)
        return ''
    else:
        return ip



class CMRESHandler(logging.Handler):
    """ Elasticsearch log handler
    """

    class AuthType(Enum):
        """ Authentication types supported

        The handler supports
         - No authentication
         - Basic authentication
        """
        NO_AUTH = 0
        BASIC_AUTH = 1
        DEVOPS_AUTH = 2

    class IndexNameFrequency(Enum):
        """ Index type supported
        the handler supports
        - Daily indices
        - Weekly indices
        - Monthly indices
        - Year indices
        """
        DAILY = 0
        WEEKLY = 1
        MONTHLY = 2
        YEARLY = 3

    # Defaults for the class
    __DEFAULT_ELASTICSEARCH_HOST = [{'host': 'localhost', 'port': 9200}]
    __DEFAULT_AUTH_USER = 'admin'
    __DEFAULT_AUTH_PASSWD = 'admin'

    __DEFAULT_USE_SSL = False
    __DEFAULT_VERIFY_SSL = True
    __DEFAULT_AUTH_TYPE = AuthType.NO_AUTH
    __DEFAULT_INDEX_FREQUENCY = IndexNameFrequency.DAILY
    __DEFAULT_BUFFER_SIZE = 1000
    __DEFAULT_FLUSH_FREQ_INSEC = 1
    __DEFAULT_ADDITIONAL_FIELDS = {}
    __DEFAULT_ES_INDEX_NAME = 'python_logger'
    __DEFAULT_ES_DOC_TYPE = '_doc'
    __DEFAULT_RAISE_ON_EXCEPTION = False
    __DEFAULT_TIMESTAMP_FIELD_NAME = "timestamp"
    __DEFAULT_ISO_TIMESTAMP_FIELD_NAME = "iso_timestamp"

    __LOGGING_FILTER_FIELDS = ['msecs',
                               'relativeCreated',
                               'levelno',
                               'created']

    @staticmethod
    def _get_daily_index_name(es_index_name):
        """ Returns elasticearch index name
        :param: index_name the prefix to be used in the index
        :return: A srting containing the elasticsearch indexname used which should include the date.
        """
        # return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m.%d'))
        return es_index_name

    @staticmethod
    def _get_weekly_index_name(es_index_name):
        """ Return elasticsearch index name
        :param: index_name the prefix to be used in the index
        :return: A srting containing the elasticsearch indexname used which should include the date and specific week
        """
        # current_date = datetime.datetime.now()
        # start_of_the_week = current_date - datetime.timedelta(days=current_date.weekday())
        # return "{0!s}-{1!s}".format(es_index_name, start_of_the_week.strftime('%Y.%m.%d'))
        return es_index_name

    @staticmethod
    def _get_monthly_index_name(es_index_name):
        """ Return elasticsearch index name
        :param: index_name the prefix to be used in the index
        :return: A srting containing the elasticsearch indexname used which should include the date and specific moth
        """
        # return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m'))
        return es_index_name

    @staticmethod
    def _get_yearly_index_name(es_index_name):
        """ Return elasticsearch index name
        :param: index_name the prefix to be used in the index
        :return: A srting containing the elasticsearch indexname used which should include the date and specific year
        """
        # return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y'))
        return es_index_name

    _INDEX_FREQUENCY_FUNCION_DICT = {
        IndexNameFrequency.DAILY: _get_daily_index_name,
        IndexNameFrequency.WEEKLY: _get_weekly_index_name,
        IndexNameFrequency.MONTHLY: _get_monthly_index_name,
        IndexNameFrequency.YEARLY: _get_yearly_index_name
    }

    def __init__(self,
                 hosts=__DEFAULT_ELASTICSEARCH_HOST,
                 auth_details=(__DEFAULT_AUTH_USER, __DEFAULT_AUTH_PASSWD),
                 auth_type=__DEFAULT_AUTH_TYPE,
                 use_ssl=__DEFAULT_USE_SSL,
                 verify_ssl=__DEFAULT_VERIFY_SSL,
                 buffer_size=__DEFAULT_BUFFER_SIZE,
                 flush_frequency_in_sec=__DEFAULT_FLUSH_FREQ_INSEC,
                 es_index_name=__DEFAULT_ES_INDEX_NAME,
                 index_name_frequency=__DEFAULT_INDEX_FREQUENCY,
                 es_doc_type=__DEFAULT_ES_DOC_TYPE,
                 es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS,
                 raise_on_indexing_exceptions=__DEFAULT_RAISE_ON_EXCEPTION,
                 default_iso_timestamp_field_name=__DEFAULT_ISO_TIMESTAMP_FIELD_NAME,
                 default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME):
        """ Handler constructor

        :param hosts: The list of hosts that elasticsearch clients will connect. The list can be provided
                    in the format ```[{'host':'host1','port':9200}, {'host':'host2','port':9200}]```to
                    make sure the client supports failover of one of the instertion nodes
        :param auth_details: When ```CMRESHandler.AuthType.BASIC_AUTH```is used this argument must contain
                    a tuple of string with the user and password that will be used to authenticate against
                    the Elasticsearch servers, for example```('User','Password')
        :param auth_type: The authentication type to be used in the connection ```CMRESHandler.AuthType```
                    Currently, NO_AUTH, BASIC_AUTH, DEVOPS_AUTH are supported
        :param use_ssl: A boolean that defines if the communications should use SSL encrypted communication
        :param verify_ssl: A boolean that defines if the SSL certificates are validated or not
        :param buffer_size: An int, Once this size is reached on the internal buffer results are flushed into ES
        :param flush_frequency_in_sec: A float representing how often and when the buffer will be flushed, even
                    if the buffer_size has not been reached yet
        :param es_index_name: A string with the prefix of the elasticsearch index that will be created. Note a
                    date with YYYY.MM.dd, ```python_logger```used by default
        :param index_name_frequency: Defines what the date used in the postfix of the name would be. available values
                    are selected from the IndexNameFrequency class (IndexNameFrequency.DAILY,
                    IndexNameFrequency.WEEKLY, IndexNameFrequency.MONTHLY, IndexNameFrequency.YEARLY). By default
                    it uses daily indices.
        :param es_doc_type: A string with the name of the document type that will be used ```python_log```used
                    by default
        :param es_additional_fields: A dictionary with all the additional fields that you would like to add
                    to the logs, such the application, environment, etc.
        :param raise_on_indexing_exceptions: A boolean, True only for debugging purposes to raise exceptions
                    caused when
        :return: A ready to be used CMRESHandler.
        """
        logging.Handler.__init__(self)

        self.hosts = hosts
        self.auth_details = auth_details
        self.auth_type = auth_type
        self.use_ssl = use_ssl
        self.verify_certs = verify_ssl

        self.buffer_size = buffer_size
        self.flush_frequency_in_sec = flush_frequency_in_sec
        self.es_index_name = es_index_name
        self.index_name_frequency = index_name_frequency
        self.es_doc_type = es_doc_type
        self.es_additional_fields = es_additional_fields.copy()

        # 原始的报错:socket.gaierror: [Errno 8] nodename nor servname provided, or not known
        # self.es_additional_fields.update({'host': socket.gethostname(),
        #                                   'host_ip': socket.gethostbyname(socket.gethostname())})

        self.raise_on_indexing_exceptions = raise_on_indexing_exceptions
        self.default_iso_timestamp_field_name = default_iso_timestamp_field_name
        self.default_timestamp_field_name = default_timestamp_field_name

        self._client = None
        self._buffer = []
        self._buffer_lock = Lock()
        self._timer = None
        self._index_name_func = CMRESHandler._INDEX_FREQUENCY_FUNCION_DICT[self.index_name_frequency]
        self.serializer = CMRESSerializer()

    def __schedule_flush(self):
        if self._timer is None:
            self._timer = Timer(self.flush_frequency_in_sec, self.flush)
            self._timer.setDaemon(True)
            self._timer.start()

    def __get_es_client(self):
        if self.auth_type == CMRESHandler.AuthType.NO_AUTH:
            if self._client is None:
                self._client = Elasticsearch(hosts=self.hosts,
                                             use_ssl=self.use_ssl,
                                             verify_certs=self.verify_certs,
                                             connection_class=RequestsHttpConnection,
                                             serializer=self.serializer)
            return self._client

        if self.auth_type == CMRESHandler.AuthType.BASIC_AUTH:
            if self._client is None:
                return Elasticsearch(hosts=self.hosts,
                                     http_auth=self.auth_details,
                                     use_ssl=self.use_ssl,
                                     verify_certs=self.verify_certs,
                                     connection_class=RequestsHttpConnection,
                                     serializer=self.serializer)
            return self._client

        raise ValueError("Authentication method not supported")

    def test_es_source(self):
        """ Returns True if the handler can ping the Elasticsearch servers

        :return: A boolean, True if the connection against elasticserach host was successful
        """
        return self.__get_es_client().ping()

    @staticmethod
    def __get_es_datetime_str(timestamp):
        """ Returns elasticsearch utc formatted time for an epoch timestamp

        :param timestamp: epoch, including milliseconds
        :return: A string valid for elasticsearch time record
        """

        current_date = datetime.datetime.utcfromtimestamp(timestamp)
        return "{0!s}.{1}".format(
            datetime.datetime.strftime(current_date + datetime.timedelta(hours=8), '%Y-%m-%dT%H:%M:%S'),
            int(current_date.microsecond))

    def flush(self):
        """ Flushes the buffer into ES
        :return: None
        """
        if self._timer is not None and self._timer.is_alive():
            self._timer.cancel()
        self._timer = None

        if self._buffer:
            try:
                with self._buffer_lock:
                    logs_buffer = self._buffer
                    self._buffer = []
                actions = (
                    {
                        '_index': self._index_name_func.__func__(self.es_index_name),
                        '_type': self.es_doc_type,
                        '_source': log_record
                    }
                    for log_record in logs_buffer
                )
                eshelpers.bulk(
                    client=self.__get_es_client(),
                    actions=actions,
                    stats_only=True
                )
            except Exception as exception:
                if self.raise_on_indexing_exceptions:
                    raise exception

    def close(self):
        """ Flushes the buffer and release any outstanding resource

        :return: None
        """
        if self._timer is not None:
            self.flush()
        self._timer = None

    def emit(self, record):
        """ Emit overrides the abstract logging.Handler logRecord emit method

        Format and records the log

        :param record: A class of type ```logging.LogRecord```
        :return: None
        """
        self.format(record)

        rec = self.es_additional_fields.copy()


        for key, value in record.__dict__.items():
             if key not in CMRESHandler.__LOGGING_FILTER_FIELDS:
                 rec[key] = "" if value is None else value
        rec[self.default_timestamp_field_name] = self.__get_es_datetime_str(record.created)


        with self._buffer_lock:
            self._buffer.append(rec)

        if len(self._buffer) >= self.buffer_size:
            self.flush()
        else:
            self.__schedule_flush()

2 调用

建议使用调用方法3(loguru),不仅可以修改写入到es中的message的格式,还有非常漂亮的终端信息显示,使用方法1、2(logging)的话,message的格式暂时未找到有用的可以修改方法,终端上的显示那不是那么的好看。

2.1 调用方法1(使用logging)

import logging
from handlers import CMRESHandler

LOG_LEVEL = 'DEBUG'  # 日志级别
LOG_FORMAT = '%(levelname)s - %(asctime)s - process: %(process)d - %(filename)s - %(name)s - %(lineno)d - %(module)s - %(message)s'  # 每条日志输出格式
ELASTIC_SEARCH_HOST = 'localhost'  # Elasticsearch Host
ELASTIC_SEARCH_PORT = 9200  # Elasticsearch Port
ELASTIC_SEARCH_INDEX = 'test_log3'  # Elasticsearch Index Name 
APP_ENVIRONMENT = 'dev'  # 运行环境,如测试环境还是生产环境

ELASTICSEARCH_USER = 'admin'
ELASTICSEARCH_PASSWORD = 'admin'

es_handler = CMRESHandler(hosts=[{'host': ELASTIC_SEARCH_HOST, 'port': ELASTIC_SEARCH_PORT}],
                                 # 用户名和密码
                                  auth_details=(ELASTICSEARCH_USER, ELASTICSEARCH_PASSWORD),
                                  # 可以配置对应的认证权限
                                  auth_type=CMRESHandler.AuthType.BASIC_AUTH,
                                  # 索引值
                                  es_index_name=ELASTIC_SEARCH_INDEX,
                                  # 额外增加环境标识
                                  es_additional_fields={'environment': APP_ENVIRONMENT}
                                  )

# 被注释的格式并未起任何作用
# es_handler.setLevel(level=LOG_LEVEL)
# formatter = logging.Formatter(LOG_FORMAT)
# es_handler.setFormatter(formatter)
logger = logging.getLogger('test')
logger.setLevel(LOG_LEVEL)
logger.addHandler(es_handler)
logger.debug('test write es2')

if __name__ == '__main__':
    # logger = get_logger()
    # logger.debug('this is a message 12')
    pass

2.2 调用方法2(使用logging,并使用配置)

# es环境参数
import datetime
import logging.config
import handlers

ELASTICSEARCH_HOST = 'localhost'
ELASTICSEARCH_PORT = '9200'
LOG_INDEX = 'test_log6'
ELASTICSEARCH_USER = 'admin'
ELASTICSEARCH_PASSWORD = 'admin'

if __name__ == '__main__':
    config = {
        'version': 1,
        'disable_existing_loggers': False,
        'formatters': {
            'simple': {
                'format': '%(levelname)s - %(asctime)s - process: %(process)d - %(filename)s - %(name)s - %(lineno)d - %(module)s - %(message)s' # 格式字段并未起作用
            }
        },
        'handlers': {
            'console': {
                'level': 'ERROR',
                'class': 'logging.StreamHandler',
                'formatter': 'simple'
            },
            'elasticsearch': {
                'level': 'ERROR',
                'class': 'handlers.CMRESHandler',
                'hosts': [{'host': ELASTICSEARCH_HOST, 'port': ELASTICSEARCH_PORT}],
                'es_index_name': LOG_INDEX,
                'es_additional_fields': {'logTime': datetime.datetime.now()},
                'auth_type': handlers.CMRESHandler.AuthType.BASIC_AUTH,
                'auth_details': (ELASTICSEARCH_USER, ELASTICSEARCH_PASSWORD),
                # 'flush_frequency_in_sec': 10,
                'use_ssl': False,
                'formatter': 'simple'
            }
        },
        'loggers': {
            'log': {
                'handlers': ['console', 'elasticsearch'],
                'level': 'ERROR',
                'propagate': True,
                'disable_existing_loggers': False,
                'formatter': 'simple'
            }
        },
    }
    logging.config.dictConfig(config)
    logger = logging.getLogger('log') # 使用log模块
    logger.error('test9')

终端输出效果如下:

在这里插入图片描述

2.3 调用方法3(使用loguru)

from loguru import logger
from handlers import CMRESHandler
from typing import AnyStr # 控制类型

def writeLog2Elasticsearch(message:AnyStr, log_index:AnyStr, log_level='info', host_name='localhost', port=9200, user_name='admin', password='admin', environment_level='test', log_format='{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}'):

    handler = CMRESHandler(hosts=[{'host': host_name, 'port': port}],
                           auth_details=(user_name, password),
                           # 可以配置对应的认证权限
                           auth_type=CMRESHandler.AuthType.BASIC_AUTH,
                           # es中的索引值
                           es_index_name=log_index,
                           # 一个月分一个 Index
                         # index_name_frequency=CMRESHandler.IndexNameFrequency.MONTHLY,
                           # 添加额外的字段
                           es_additional_fields={'environment': environment_level}
                           )

    # 添加message的格式
    trace = logger.add(handler, format=log_format)

    if log_level == 'info':
        logger.info(message)
    elif log_level == 'warning':
        logger.warning(message)
    elif log_level == 'error':
        logger.error(message)
    elif log_level == 'debug':
        logger.debug(message)
    else:
        pass
        
    # 日志的刷新重新写入(防止同一句柄数据,重复写入)
    logger.remove(trace)


if __name__ == '__main__':
   writeLog2Elasticsearch(log_index='test_es3', message='test213', log_level='error')

终端输出效果如下:
请添加图片描述

es中的detail中的信息(按照格式输出):
在这里插入图片描述

3 添加或删除写入的es字段信息

3.1 添加字段

CMRESHandler中使用es_additional_fields字段即可。

    handler = CMRESHandler(hosts=[{'host': host_name, 'port': port}],
                           auth_details=(user_name, password),
                           # 可以配置对应的认证权限
                           auth_type=CMRESHandler.AuthType.BASIC_AUTH,
                           # es中的索引值
                           es_index_name=log_index,
                           # 一个月分一个 Index
                         # index_name_frequency=CMRESHandler.IndexNameFrequency.MONTHLY,
                           # 添加额外的字段
                           es_additional_fields={'environment': environment_level}
                           )

3.2 删除字段

如果使用from cmreslogging.handlers import CMRESHandler中引入CMRESHandle,那么只能做到增加字段,没有找到删除字段的接口(另外还可能会遇到socket.gaierror: [Errno 8] nodename nor servname provided, or not known的错误)。因此需要自定义发射的字段,需要把库文件提取出来,自己改写其中的内容。

主要修改的内容为emit函数和__init__函数:

emit函数(发射的主要内容为rec字典变量中的内容):

def emit(self, record):
        """ Emit overrides the abstract logging.Handler logRecord emit method

        Format and records the log

        :param record: A class of type ```logging.LogRecord```
        :return: None
        """
        self.format(record)
        # 添加自己需要的字段
        self.es_additional_fields.update({
            'host': get_local_ip(),
            # 'service': 'spark-server', # settings.service_name
            'logLevel': record.levelname,
            'details': record.msg,
            'stackTrace': record.exc_info,
            'thread': record.__dict__['threadName'],
            'logger': record.pathname,
            'logData': record.exc_text
        })
        rec = self.es_additional_fields.copy()
		
		# 删除不需要的字段
        # for key, value in record.__dict__.items():
        #     if key not in CMRESHandler.__LOGGING_FILTER_FIELDS:
        #         rec[key] = "" if value is None else value

        rec[self.default_timestamp_field_name] = self.__get_es_datetime_str(record.created)

        with self._buffer_lock:
            self._buffer.append(rec)

        if len(self._buffer) >= self.buffer_size:
            self.flush()
        else:
            self.__schedule_flush()

__init__函数:

    def __init__(self,
                 hosts=__DEFAULT_ELASTICSEARCH_HOST,
                 auth_details=(__DEFAULT_AUTH_USER, __DEFAULT_AUTH_PASSWD),
                 auth_type=__DEFAULT_AUTH_TYPE,
                 use_ssl=__DEFAULT_USE_SSL,
                 verify_ssl=__DEFAULT_VERIFY_SSL,
                 buffer_size=__DEFAULT_BUFFER_SIZE,
                 flush_frequency_in_sec=__DEFAULT_FLUSH_FREQ_INSEC,
                 es_index_name=__DEFAULT_ES_INDEX_NAME,
                 index_name_frequency=__DEFAULT_INDEX_FREQUENCY,
                 es_doc_type=__DEFAULT_ES_DOC_TYPE,
                 es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS,
                 raise_on_indexing_exceptions=__DEFAULT_RAISE_ON_EXCEPTION,
                 default_iso_timestamp_field_name=__DEFAULT_ISO_TIMESTAMP_FIELD_NAME,
                 default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME):
                 
                 # ...
				 # 注释掉下面原库添加的字段信息
				 # self.es_additional_fields.update({'host': socket.gethostname(),
        #                                   'host_ip': socket.gethostbyname(socket.gethostname())})
				# ...

完整的handles.py代码内容如下:

#!/usr/bin/env python3

# 链接和初始化elasticsearch

import logging
import datetime
import socket
from threading import Timer, Lock
from enum import Enum
from elasticsearch import helpers as eshelpers
from elasticsearch import Elasticsearch, RequestsHttpConnection


# from CMRESSerializer import CMRESSerializer
# from getLocal_ip import get_local_ip
# import settings

from elasticsearch.serializer import JSONSerializer


class CMRESSerializer(JSONSerializer):
    def default(self, data):
        try:
            return super(CMRESSerializer, self).default(data)
        except TypeError:
            return str(data)


def get_local_ip():
    """
    获取本地IP

    :return:
    """

    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    except Exception as e:
        print(e)
        return ''
    else:
        return ip



class CMRESHandler(logging.Handler):
    """ Elasticsearch log handler
    """

    class AuthType(Enum):
        """ Authentication types supported

        The handler supports
         - No authentication
         - Basic authentication
        """
        NO_AUTH = 0
        BASIC_AUTH = 1
        DEVOPS_AUTH = 2

    class IndexNameFrequency(Enum):
        """ Index type supported
        the handler supports
        - Daily indices
        - Weekly indices
        - Monthly indices
        - Year indices
        """
        DAILY = 0
        WEEKLY = 1
        MONTHLY = 2
        YEARLY = 3

    # Defaults for the class
    __DEFAULT_ELASTICSEARCH_HOST = [{'host': '10.97.138.194', 'port': 9200}]
    __DEFAULT_AUTH_USER = 'elastic'
    __DEFAULT_AUTH_PASSWD = 'ES@ynzy2020'

    __DEFAULT_USE_SSL = False
    __DEFAULT_VERIFY_SSL = True
    __DEFAULT_AUTH_TYPE = AuthType.NO_AUTH
    __DEFAULT_INDEX_FREQUENCY = IndexNameFrequency.DAILY
    __DEFAULT_BUFFER_SIZE = 1000
    __DEFAULT_FLUSH_FREQ_INSEC = 1
    __DEFAULT_ADDITIONAL_FIELDS = {}
    __DEFAULT_ES_INDEX_NAME = 'python_logger'
    __DEFAULT_ES_DOC_TYPE = '_doc'
    __DEFAULT_RAISE_ON_EXCEPTION = False
    __DEFAULT_TIMESTAMP_FIELD_NAME = "timestamp"
    __DEFAULT_ISO_TIMESTAMP_FIELD_NAME = "iso_timestamp"

    __LOGGING_FILTER_FIELDS = ['msecs',
                               'relativeCreated',
                               'levelno',
                               'created']

    @staticmethod
    def _get_daily_index_name(es_index_name):
        """ Returns elasticearch index name
        :param: index_name the prefix to be used in the index
        :return: A srting containing the elasticsearch indexname used which should include the date.
        """
        # return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m.%d'))
        return es_index_name

    @staticmethod
    def _get_weekly_index_name(es_index_name):
        """ Return elasticsearch index name
        :param: index_name the prefix to be used in the index
        :return: A srting containing the elasticsearch indexname used which should include the date and specific week
        """
        # current_date = datetime.datetime.now()
        # start_of_the_week = current_date - datetime.timedelta(days=current_date.weekday())
        # return "{0!s}-{1!s}".format(es_index_name, start_of_the_week.strftime('%Y.%m.%d'))
        return es_index_name

    @staticmethod
    def _get_monthly_index_name(es_index_name):
        """ Return elasticsearch index name
        :param: index_name the prefix to be used in the index
        :return: A srting containing the elasticsearch indexname used which should include the date and specific moth
        """
        # return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m'))
        return es_index_name

    @staticmethod
    def _get_yearly_index_name(es_index_name):
        """ Return elasticsearch index name
        :param: index_name the prefix to be used in the index
        :return: A srting containing the elasticsearch indexname used which should include the date and specific year
        """
        # return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y'))
        return es_index_name

    _INDEX_FREQUENCY_FUNCION_DICT = {
        IndexNameFrequency.DAILY: _get_daily_index_name,
        IndexNameFrequency.WEEKLY: _get_weekly_index_name,
        IndexNameFrequency.MONTHLY: _get_monthly_index_name,
        IndexNameFrequency.YEARLY: _get_yearly_index_name
    }

    def __init__(self,
                 hosts=__DEFAULT_ELASTICSEARCH_HOST,
                 auth_details=(__DEFAULT_AUTH_USER, __DEFAULT_AUTH_PASSWD),
                 auth_type=__DEFAULT_AUTH_TYPE,
                 use_ssl=__DEFAULT_USE_SSL,
                 verify_ssl=__DEFAULT_VERIFY_SSL,
                 buffer_size=__DEFAULT_BUFFER_SIZE,
                 flush_frequency_in_sec=__DEFAULT_FLUSH_FREQ_INSEC,
                 es_index_name=__DEFAULT_ES_INDEX_NAME,
                 index_name_frequency=__DEFAULT_INDEX_FREQUENCY,
                 es_doc_type=__DEFAULT_ES_DOC_TYPE,
                 es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS,
                 raise_on_indexing_exceptions=__DEFAULT_RAISE_ON_EXCEPTION,
                 default_iso_timestamp_field_name=__DEFAULT_ISO_TIMESTAMP_FIELD_NAME,
                 default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME):
        """ Handler constructor

        :param hosts: The list of hosts that elasticsearch clients will connect. The list can be provided
                    in the format ```[{'host':'host1','port':9200}, {'host':'host2','port':9200}]```to
                    make sure the client supports failover of one of the instertion nodes
        :param auth_details: When ```CMRESHandler.AuthType.BASIC_AUTH```is used this argument must contain
                    a tuple of string with the user and password that will be used to authenticate against
                    the Elasticsearch servers, for example```('User','Password')
        :param auth_type: The authentication type to be used in the connection ```CMRESHandler.AuthType```
                    Currently, NO_AUTH, BASIC_AUTH, DEVOPS_AUTH are supported
        :param use_ssl: A boolean that defines if the communications should use SSL encrypted communication
        :param verify_ssl: A boolean that defines if the SSL certificates are validated or not
        :param buffer_size: An int, Once this size is reached on the internal buffer results are flushed into ES
        :param flush_frequency_in_sec: A float representing how often and when the buffer will be flushed, even
                    if the buffer_size has not been reached yet
        :param es_index_name: A string with the prefix of the elasticsearch index that will be created. Note a
                    date with YYYY.MM.dd, ```python_logger```used by default
        :param index_name_frequency: Defines what the date used in the postfix of the name would be. available values
                    are selected from the IndexNameFrequency class (IndexNameFrequency.DAILY,
                    IndexNameFrequency.WEEKLY, IndexNameFrequency.MONTHLY, IndexNameFrequency.YEARLY). By default
                    it uses daily indices.
        :param es_doc_type: A string with the name of the document type that will be used ```python_log```used
                    by default
        :param es_additional_fields: A dictionary with all the additional fields that you would like to add
                    to the logs, such the application, environment, etc.
        :param raise_on_indexing_exceptions: A boolean, True only for debugging purposes to raise exceptions
                    caused when
        :return: A ready to be used CMRESHandler.
        """
        logging.Handler.__init__(self)

        self.hosts = hosts
        self.auth_details = auth_details
        self.auth_type = auth_type
        self.use_ssl = use_ssl
        self.verify_certs = verify_ssl

        self.buffer_size = buffer_size
        self.flush_frequency_in_sec = flush_frequency_in_sec
        self.es_index_name = es_index_name
        self.index_name_frequency = index_name_frequency
        self.es_doc_type = es_doc_type
        self.es_additional_fields = es_additional_fields.copy()
        self.es_additional_fields.update({'host': socket.gethostname(),
                                          'host_ip': get_local_ip()})

        # 原始的报错:socket.gaierror: [Errno 8] nodename nor servname provided, or not known
        # self.es_additional_fields.update({'host': socket.gethostname(),
        #                                   'host_ip': socket.gethostbyname(socket.gethostname())})

        self.raise_on_indexing_exceptions = raise_on_indexing_exceptions
        self.default_iso_timestamp_field_name = default_iso_timestamp_field_name
        self.default_timestamp_field_name = default_timestamp_field_name

        self._client = None
        self._buffer = []
        self._buffer_lock = Lock()
        self._timer = None
        self._index_name_func = CMRESHandler._INDEX_FREQUENCY_FUNCION_DICT[self.index_name_frequency]
        self.serializer = CMRESSerializer()

    def __schedule_flush(self):
        if self._timer is None:
            self._timer = Timer(self.flush_frequency_in_sec, self.flush)
            self._timer.setDaemon(True)
            self._timer.start()

    def __get_es_client(self):
        if self.auth_type == CMRESHandler.AuthType.NO_AUTH:
            if self._client is None:
                self._client = Elasticsearch(hosts=self.hosts,
                                             use_ssl=self.use_ssl,
                                             verify_certs=self.verify_certs,
                                             connection_class=RequestsHttpConnection,
                                             serializer=self.serializer)
            return self._client

        if self.auth_type == CMRESHandler.AuthType.BASIC_AUTH:
            if self._client is None:
                return Elasticsearch(hosts=self.hosts,
                                     http_auth=self.auth_details,
                                     use_ssl=self.use_ssl,
                                     verify_certs=self.verify_certs,
                                     connection_class=RequestsHttpConnection,
                                     serializer=self.serializer)
            return self._client

        raise ValueError("Authentication method not supported")

    def test_es_source(self):
        """ Returns True if the handler can ping the Elasticsearch servers

        :return: A boolean, True if the connection against elasticserach host was successful
        """
        return self.__get_es_client().ping()

    @staticmethod
    def __get_es_datetime_str(timestamp):
        """ Returns elasticsearch utc formatted time for an epoch timestamp

        :param timestamp: epoch, including milliseconds
        :return: A string valid for elasticsearch time record
        """

        current_date = datetime.datetime.utcfromtimestamp(timestamp)
        return "{0!s}.{1}".format(
            datetime.datetime.strftime(current_date + datetime.timedelta(hours=8), '%Y-%m-%dT%H:%M:%S'),
            int(current_date.microsecond))

    def flush(self):
        """ Flushes the buffer into ES
        :return: None
        """
        if self._timer is not None and self._timer.is_alive():
            self._timer.cancel()
        self._timer = None

        if self._buffer:
            try:
                with self._buffer_lock:
                    logs_buffer = self._buffer
                    self._buffer = []
                actions = (
                    {
                        '_index': self._index_name_func.__func__(self.es_index_name),
                        '_type': self.es_doc_type,
                        '_source': log_record
                    }
                    for log_record in logs_buffer
                )
                eshelpers.bulk(
                    client=self.__get_es_client(),
                    actions=actions,
                    stats_only=True
                )
            except Exception as exception:
                if self.raise_on_indexing_exceptions:
                    raise exception

    def close(self):
        """ Flushes the buffer and release any outstanding resource

        :return: None
        """
        if self._timer is not None:
            self.flush()
        self._timer = None

    def emit(self, record):
        """ Emit overrides the abstract logging.Handler logRecord emit method

        Format and records the log

        :param record: A class of type ```logging.LogRecord```
        :return: None
        """
        self.format(record)
        # 添加自己需要的字段
        self.es_additional_fields.update({
            'host': get_local_ip(),
            # 'service': 'spark-server', # settings.service_name
            'logLevel': record.levelname,
            'details': record.msg,
            'stackTrace': record.exc_info,
            'thread': record.__dict__['threadName'],
            'logger': record.pathname,
            'logData': record.exc_text
        })
        rec = self.es_additional_fields.copy()
		
		# 删除不需要的字段
        # for key, value in record.__dict__.items():
        #     if key not in CMRESHandler.__LOGGING_FILTER_FIELDS:
        #         rec[key] = "" if value is None else value

        rec[self.default_timestamp_field_name] = self.__get_es_datetime_str(record.created)

        with self._buffer_lock:
            self._buffer.append(rec)

        if len(self._buffer) >= self.buffer_size:
            self.flush()
        else:
            self.__schedule_flush()

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python写入日志到Elasticsearch(logging/loguru,可个性化定制写入信息)————附带详细代码和示例 的相关文章

  • Joint state with name: “base_l_wheel_joint” was received but not found in URDF

    ROS melodic下运行出现 WARN xff1a Joint state with name base l wheel joint was received but not found in URDF 原因是在robot描述文件URD
  • 已解决 vmware 虚拟机安装后没有虚拟网卡问题

    我用的方法是重装vmware xff0c 使用的是win10的系统 之前安装网ubuntu以后 xff0c 发现主机并没有虚拟网卡 xff0c 也百度了各种方法 xff0c 然而并没有什么用 xff0c 也问了很多人 xff0c 他们也提供
  • rk3399下pwm驱动

    现在记录一下rk3399下pwm的驱动编写 xff0c 下面是内核pwm的API xff0c 从开源论坛复制 xff08 firefly的开源论坛里面的Wiki教程 xff09 1 在要使用 PWM 控制的设备驱动文件中包含以下头文件 xf
  • rk3399下spi驱动

    SPI 使用 Note xff1a 本文从firefly wiki截取 SPI是一种高速的 xff0c 全双工 xff0c 同步串行通信接口 xff0c 用于连接微控制器 传感器 存储设备等 Firefly RK3399 开发板提供了 SP
  • rk3399 u-boot修改开机logo以及开机动画和开机视频

    首先分析了一下uboot启动流程中的一部分代码 xff0c 如下 第一部分 xff1a 开机logo xff08 下面代码分析排版有点乱 xff0c 可以忽略 xff09 1 board late init rk33xx c board r
  • VMware 虚拟网卡防火墙问题

    看了很多人遇到过一段时间会自己删除虚拟网卡的问题 xff0c 这里做一个补充 xff0c 关于防火墙问题 xff0c 如下 这里点进去 点击更改设置 xff08 先找到下图这一项 xff09 最后记得保存更改 xff0c 关于VMware的
  • postman汉化包下载

    postman汉化包 https github com hlmd Postman cn releases postman官网下载地址 Download Postman Get Started for Free
  • 一帧数据接收方法

    最近在做485数据通讯 xff0c 遇到一些通讯问题 xff0c 特意去查找资料 xff0c 一帧数据接收有三种方法 xff0c 现分享如下 xff1a 第一种方法 xff1a 根据帧头和帧尾进行校验 xff0c 串口发送2字节例如 xff
  • 如何使用RTKLIB进行RTK定位(一)

    今天从这个demo xff0c 教给大家如何使用RTKLIB进行RTK定位 xff0c 包括配置文件 数据等 xff1b RTKLIB源码和exe下载地址 xff1a RTKLIB An Open Source Program Packag
  • C++ “::” 作用域符 双冒号

    一 是作用域符 xff0c 是运算符中等级最高的 xff0c 它分为三种 1 global scope 全局作用域符 xff09 xff0c 用法 xff08 name 2 class scope 类作用域符 xff09 xff0c 用法
  • OpenMv测距(Apriltag)

    利用OpenMv测离Apriltag的距离 xff08 其他色块啥的算法都差不多 xff0c 主要是Apriltag精确一些 xff09 span class token comment 本次利用OpenMv单目测距Apriltag离摄像头
  • CMake Error at /usr/lib/x86_64-linux-gnu/cmake/Qt5Core/Qt5CoreConfig.cmake:27 (message)

    CMake Error at usr lib x86 64 linux gnu cmake Qt5Core Qt5CoreConfig cmake 27 message 在catkin make的时候 xff0c 如果提示 so文件报错 x
  • Deep-Sort多目标追踪算法代码解析

    Deep SORT是多目标跟踪 Multi Object Tracking 中常用到的一种算法 xff0c 是一个Detection Based Tracking的方法 这个算法工业界关注度非常高 xff0c 在知乎上有很多文章都是使用了D
  • 红黑树的查找时间复杂度O(logn)

    红黑树查找时间复杂度 如果二叉排序树是平衡的 xff0c 则n个节点的二叉排序树的高度为Log2n 43 1 其查找效率为O Log2n xff0c 近似于折半查找 如果二叉排序树完全不平衡 xff0c 则其深度可达到n xff0c 查找效
  • Ubuntu16.04环境下STM32和ROS间的串口通信

    目录 前言介绍 lt 1 gt 最终协议的样子 lt 2 gt 本方案提供的API实现的功能 原理 lt 1 gt 简要叙述 lt 2 gt 这里是如何使用共用体的 xff1f 前期准备 lt 1 gt 确保硬件连接 lt 2 gt 查看串
  • C++版本OpenCv教程(三十五 )Laplacian算子

    上述的边缘检测算子都具有方向性 xff0c 因此需要分别求取X方向的边缘和Y方向的边缘 xff0c 之后将两个方向的边缘综合得到图像的整体边缘 Laplacian算子具有各方向同性的特点 xff0c 能够对任意方向的边缘进行提取 xff0c
  • 【从零开始学深度学习编译器】五,TVM Relay以及Pass简介

    TVM Relay以及Pass简介 0x0 介绍0x2 Relay介绍0x2 1 使用Relay建立一个计算图0x2 2 Module xff1a 支持多个函数 xff08 Graphs xff09 0x2 3 Let Binding an
  • 模型量化的原理与实践 —基于YOLOv5实践目标检测的PTQ与QAT量化

    这里写自定义目录标题 一 量化基础知识 1 1 Tops是什么意思 1 2 什么是定点数 1 3 定点数转换 1 4 什么是量化 1 5 定点计算 1 5 1 定点计算 误差计算 1 5 2 定点计算 内存对比 1 5 3 定点计算 速度对
  • TensorRT INT8量化说明文档

    TensorRT developer guide intro quantization 7 Working with INT8 7 1 Introduction to Quantization 7 1 1 Quantization Work
  • YOLO-NAS讲解

    Meet YOLO NAS New YOLO Object Detection Model Beats YOLOv6 amp YOLOv8 代码链接 What is YOLO NAS What does the NAS in YOLO NA

随机推荐