12-07 1 views
本文章以Django框架配置为例,主要功能使用了cmanaha的代码,做了些自己项目的适配
原理就是继承logging.Handler重写emit,实现日志刷入ES的逻辑
其中个人感觉设计的比较好的地方是引入了多线程的定时器和锁,通过列表容器实现批量刷入ES,这里也用到了ES的Bulk方法,比较高效
序列化类
1 2 3 4 5 6 7 8 9 10 11 |
#!/usr/bin/env python3 from elasticsearch.serializer import JSONSerializer class CMRESSerializer(JSONSerializer): def default(self, data): try: return super(CMRESSerializer, self).default(data) except TypeError: return str(data) |
获取本机IP方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # @Author : Eric Winn # @Email : eng.eric.winn@gmail.com # @Time : 2019/5/13 2:07 PM # @Version : 1.0 # @File : utils # @Software : PyCharm import socket def get_local_ip(): """ 获取本地IP :return: """ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] return ip |
继承logging.handler类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 |
#!/usr/bin/env python3 import logging import datetime import socket from threading import Timer, Lock from enum import Enum from elasticsearch import helpers as eshelpers from elasticsearch import Elasticsearch, RequestsHttpConnection from .serializers import CMRESSerializer from .utils import get_local_ip class CMRESHandler(logging.Handler): """ Elasticsearch log handler """ class AuthType(Enum): """ Authentication types supported The handler supports - No authentication - Basic authentication """ NO_AUTH = 0 BASIC_AUTH = 1 DEVOPS_AUTH = 2 class IndexNameFrequency(Enum): """ Index type supported the handler supports - Daily indices - Weekly indices - Monthly indices - Year indices """ DAILY = 0 WEEKLY = 1 MONTHLY = 2 YEARLY = 3 # Defaults for the class __DEFAULT_ELASTICSEARCH_HOST = [{'host': 'localhost', 'port': 9200}] __DEFAULT_AUTH_USER = '' __DEFAULT_AUTH_PASSWD = '' __DEFAULT_USE_SSL = False __DEFAULT_VERIFY_SSL = True __DEFAULT_AUTH_TYPE = AuthType.NO_AUTH __DEFAULT_INDEX_FREQUENCY = IndexNameFrequency.DAILY __DEFAULT_BUFFER_SIZE = 1000 __DEFAULT_FLUSH_FREQ_INSEC = 1 __DEFAULT_ADDITIONAL_FIELDS = {} __DEFAULT_ES_INDEX_NAME = 'python_logger' __DEFAULT_ES_DOC_TYPE = 'python_log' __DEFAULT_RAISE_ON_EXCEPTION = False __DEFAULT_TIMESTAMP_FIELD_NAME = "timestamp" __DEFAULT_ISO_TIMESTAMP_FIELD_NAME = "iso_timestamp" __LOGGING_FILTER_FIELDS = ['msecs', 'relativeCreated', 'levelno', 'created'] @staticmethod def _get_daily_index_name(es_index_name): """ Returns elasticearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date. """ return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m.%d')) @staticmethod def _get_weekly_index_name(es_index_name): """ Return elasticsearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date and specific week """ current_date = datetime.datetime.now() start_of_the_week = current_date - datetime.timedelta(days=current_date.weekday()) return "{0!s}-{1!s}".format(es_index_name, start_of_the_week.strftime('%Y.%m.%d')) @staticmethod def _get_monthly_index_name(es_index_name): """ Return elasticsearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date and specific moth """ return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m')) @staticmethod def _get_yearly_index_name(es_index_name): """ Return elasticsearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date and specific year """ return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y')) _INDEX_FREQUENCY_FUNCION_DICT = { IndexNameFrequency.DAILY: _get_daily_index_name, IndexNameFrequency.WEEKLY: _get_weekly_index_name, IndexNameFrequency.MONTHLY: _get_monthly_index_name, IndexNameFrequency.YEARLY: _get_yearly_index_name } def __init__(self, hosts=__DEFAULT_ELASTICSEARCH_HOST, auth_details=(__DEFAULT_AUTH_USER, __DEFAULT_AUTH_PASSWD), auth_type=__DEFAULT_AUTH_TYPE, use_ssl=__DEFAULT_USE_SSL, verify_ssl=__DEFAULT_VERIFY_SSL, buffer_size=__DEFAULT_BUFFER_SIZE, flush_frequency_in_sec=__DEFAULT_FLUSH_FREQ_INSEC, es_index_name=__DEFAULT_ES_INDEX_NAME, index_name_frequency=__DEFAULT_INDEX_FREQUENCY, es_doc_type=__DEFAULT_ES_DOC_TYPE, es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS, raise_on_indexing_exceptions=__DEFAULT_RAISE_ON_EXCEPTION, default_iso_timestamp_field_name=__DEFAULT_ISO_TIMESTAMP_FIELD_NAME, default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME): """ Handler constructor :param hosts: The list of hosts that elasticsearch clients will connect. The list can be provided in the format ```[{'host':'host1','port':9200}, {'host':'host2','port':9200}]``` to make sure the client supports failover of one of the instertion nodes :param auth_details: When ```CMRESHandler.AuthType.BASIC_AUTH``` is used this argument must contain a tuple of string with the user and password that will be used to authenticate against the Elasticsearch servers, for example```('User','Password') :param auth_type: The authentication type to be used in the connection ```CMRESHandler.AuthType``` Currently, NO_AUTH, BASIC_AUTH, DEVOPS_AUTH are supported :param use_ssl: A boolean that defines if the communications should use SSL encrypted communication :param verify_ssl: A boolean that defines if the SSL certificates are validated or not :param buffer_size: An int, Once this size is reached on the internal buffer results are flushed into ES :param flush_frequency_in_sec: A float representing how often and when the buffer will be flushed, even if the buffer_size has not been reached yet :param es_index_name: A string with the prefix of the elasticsearch index that will be created. Note a date with YYYY.MM.dd, ```python_logger``` used by default :param index_name_frequency: Defines what the date used in the postfix of the name would be. available values are selected from the IndexNameFrequency class (IndexNameFrequency.DAILY, IndexNameFrequency.WEEKLY, IndexNameFrequency.MONTHLY, IndexNameFrequency.YEARLY). By default it uses daily indices. :param es_doc_type: A string with the name of the document type that will be used ```python_log``` used by default :param es_additional_fields: A dictionary with all the additional fields that you would like to add to the logs, such the application, environment, etc. :param raise_on_indexing_exceptions: A boolean, True only for debugging purposes to raise exceptions caused when :return: A ready to be used CMRESHandler. """ logging.Handler.__init__(self) self.hosts = hosts self.auth_details = auth_details self.auth_type = auth_type self.use_ssl = use_ssl self.verify_certs = verify_ssl self.buffer_size = buffer_size self.flush_frequency_in_sec = flush_frequency_in_sec self.es_index_name = es_index_name self.index_name_frequency = index_name_frequency self.es_doc_type = es_doc_type self.es_additional_fields = es_additional_fields.copy() self.es_additional_fields.update({'host': socket.gethostname(), 'host_ip': get_local_ip()}) self.raise_on_indexing_exceptions = raise_on_indexing_exceptions self.default_iso_timestamp_field_name = default_iso_timestamp_field_name self.default_timestamp_field_name = default_timestamp_field_name self._client = None self._buffer = [] self._buffer_lock = Lock() self._timer = None self._index_name_func = CMRESHandler._INDEX_FREQUENCY_FUNCION_DICT[self.index_name_frequency] self.serializer = CMRESSerializer() def __schedule_flush(self): if self._timer is None: self._timer = Timer(self.flush_frequency_in_sec, self.flush) self._timer.setDaemon(True) self._timer.start() def __get_es_client(self): if self.auth_type == CMRESHandler.AuthType.NO_AUTH: if self._client is None: self._client = Elasticsearch(hosts=self.hosts, use_ssl=self.use_ssl, verify_certs=self.verify_certs, connection_class=RequestsHttpConnection, serializer=self.serializer) return self._client if self.auth_type == CMRESHandler.AuthType.BASIC_AUTH: if self._client is None: return Elasticsearch(hosts=self.hosts, http_auth=self.auth_details, use_ssl=self.use_ssl, verify_certs=self.verify_certs, connection_class=RequestsHttpConnection, serializer=self.serializer) return self._client raise ValueError("Authentication method not supported") def test_es_source(self): """ Returns True if the handler can ping the Elasticsearch servers :return: A boolean, True if the connection against elasticserach host was successful """ return self.__get_es_client().ping() @staticmethod def __get_es_datetime_str(timestamp): """ Returns elasticsearch utc formatted time for an epoch timestamp :param timestamp: epoch, including milliseconds :return: A string valid for elasticsearch time record """ current_date = datetime.datetime.utcfromtimestamp(timestamp) return "{0!s}.{1}".format( datetime.datetime.strftime(current_date + datetime.timedelta(hours=8), '%Y-%m-%dT%H:%M:%S'), int(current_date.microsecond)) def flush(self): """ Flushes the buffer into ES :return: None """ if self._timer is not None and self._timer.is_alive(): self._timer.cancel() self._timer = None if self._buffer: try: with self._buffer_lock: logs_buffer = self._buffer self._buffer = [] actions = ( { '_index': self._index_name_func.__func__(self.es_index_name), '_type': self.es_doc_type, '_source': log_record } for log_record in logs_buffer ) eshelpers.bulk( client=self.__get_es_client(), actions=actions, stats_only=True ) except Exception as exception: if self.raise_on_indexing_exceptions: raise exception def close(self): """ Flushes the buffer and release any outstanding resource :return: None """ if self._timer is not None: self.flush() self._timer = None def emit(self, record): """ Emit overrides the abstract logging.Handler logRecord emit method Format and records the log :param record: A class of type ```logging.LogRecord``` :return: None """ self.format(record) rec = self.es_additional_fields.copy() for key, value in record.__dict__.items(): if key not in CMRESHandler.__LOGGING_FILTER_FIELDS: rec[key] = "" if value is None else value rec[self.default_timestamp_field_name] = self.__get_es_datetime_str(record.created) with self._buffer_lock: self._buffer.append(rec) if len(self._buffer) >= self.buffer_size: self.flush() else: self.__schedule_flush() |
Django配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
ELASTICSEARCH_HOST = CONFIG.ELASTICSEARCH_HOST ELASTICSEARCH_PORT = CONFIG.ELASTICSEARCH_PORT LOGGING = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'default': { 'format': '[%(levelname)s][%(asctime)s] [task_id:%(name)s] [%(filename)s:%(lineno)d]%(message)s' }, }, 'filters': { 'require_debug_true': { '()': 'django.utils.log.RequireDebugTrue', }, }, 'handlers': { 'console': { 'level': 'DEBUG', 'filters': ['require_debug_true'], 'class': 'logging.StreamHandler', 'formatter': 'default' }, 'elasticsearch': { 'level': 'DEBUG', 'class': 'cmdb.log.handlers.CMRESHandler', 'hosts': [{'host': ELASTICSEARCH_HOST, 'port': ELASTICSEARCH_PORT}], 'es_index_name': CONFIG.LOG_INDEX, 'index_name_frequency': CMRESHandler.IndexNameFrequency.MONTHLY, 'auth_type': CMRESHandler.AuthType.BASIC_AUTH, 'auth_details': (CONFIG.ELASTICSEARCH_USER, CONFIG.ELASTICSEARCH_PASSWORD), # 'flush_frequency_in_sec': 10, 'use_ssl': False } }, 'loggers': { "": { 'handlers': ['console'], 'level': 'DEBUG', }, 'Login': { 'handlers': ['elasticsearch'], 'level': 'DEBUG', } }, } |
如果想赏钱,可以用微信扫描下面的二维码,一来能刺激我写博客的欲望,二来好维护云主机的费用; 另外再次标注博客原地址 itnotebooks.com 感谢!