def __init__(
self,
facility: Optional[int]=syslog.LOG_USER,
host: Optional[str]='localhost',
port: Optional[int]=514,
encoding: Optional[str]='utf8',
name: Optional[str]=None,
level: Optional[LogLevel]=None
):
"""Instantiates a new ``SyslogHandler``
:param facility: the syslog facility
:param host: the hostname of the syslog server
:param port: the port of the syslog server
:param encoding: the message encoding
:param name: the name of the handler
:param level: the minimum verbosity level to write log entries
"""
self.facility = facility
super().__init__(
name=name, level=level, host=host, port=port, family=socket.AF_INET,
type=socket.SOCK_DGRAM, encoding=encoding)
python类LOG_USER的实例源码
def confirm_leave(self, owner, level):
"""
??????owner,level???????facility,severity
"""
# ?????
facility = syslog.LOG_USER
severity = syslog.LOG_INFO
# as??????syslog????
level_info = dict(INFO=syslog.LOG_INFO, WARN=syslog.LOG_WARNING,
ERROR=syslog.LOG_ERR, DEBUG=syslog.LOG_DEBUG)
if level in level_info.keys():
severity = level_info[level]
# ????
if owner in ['ecms_troubleshoot']:
severity = syslog.syslog.LOG_EMERG
return facility, severity
def __init__(self, program_name, facility=None):
# Default values always get evaluated, for which reason we avoid
# using 'syslog' directly, which may not be available.
facility = facility if facility is not None else syslog.LOG_USER
if not syslog:
raise RuntimeError("Syslog not available on this platform")
super(SyslogHandler, self).__init__()
syslog.openlog(program_name, 0, facility)
def test_find_facility(self):
self.assertEqual(syslog.LOG_USER,
output.Syslog._find_facility("user"))
self.assertEqual(syslog.LOG_LOCAL1,
output.Syslog._find_facility("log_local1"))
self.assertEqual(syslog.LOG_LOCAL2,
output.Syslog._find_facility("LOG_local2"))
self.assertEqual(syslog.LOG_LOCAL3,
output.Syslog._find_facility("LOG_LOCAL3"))
self.assertEqual(syslog.LOG_LOCAL4,
output.Syslog._find_facility("LOCaL4"))
def _check_arguments(self, check_invalid_arguments):
self._syslog_facility = 'LOG_USER'
for (k,v) in list(self.params.items()):
if k == '_ansible_check_mode' and v:
self.check_mode = True
elif k == '_ansible_no_log':
self.no_log = self.boolean(v)
elif k == '_ansible_debug':
self._debug = self.boolean(v)
elif k == '_ansible_diff':
self._diff = self.boolean(v)
elif k == '_ansible_verbosity':
self._verbosity = v
elif k == '_ansible_selinux_special_fs':
self._selinux_special_fs = v
elif k == '_ansible_syslog_facility':
self._syslog_facility = v
elif k == '_ansible_version':
self.ansible_version = v
elif k == '_ansible_module_name':
self._name = v
elif check_invalid_arguments and k not in self._legal_inputs:
self.fail_json(msg="unsupported parameter for module: %s" % k)
#clean up internal params:
if k.startswith('_ansible_'):
del self.params[k]
if self.check_mode and not self.supports_check_mode:
self.exit_json(skipped=True, msg="remote module (%s) does not support check mode" % self._name)
def _log_to_syslog(self, msg):
if HAS_SYSLOG:
module = 'ansible-%s' % self._name
facility = getattr(syslog, self._syslog_facility, syslog.LOG_USER)
syslog.openlog(str(module), 0, facility)
syslog.syslog(syslog.LOG_INFO, msg)
def test_create_name(self):
assert self.handler.name == 'syslog-{}'.format(syslog.LOG_USER)
def handle(self):
data = bytes.decode(self.request[0].strip())
#socket = self.request[1]
#print( "%s : " % self.client_address[0], str(data))
#logging.info(str(data))
SyslogUDPHandler.nslogger.log(syslog.LOG_USER, syslog.LOG_INFO, str(data), pid=False)
SyslogUDPHandler.count = SyslogUDPHandler.count + 1
def _log_to_syslog(self, msg):
if HAS_SYSLOG:
module = 'ansible-%s' % self._name
facility = getattr(syslog, self._syslog_facility, syslog.LOG_USER)
syslog.openlog(str(module), 0, facility)
syslog.syslog(syslog.LOG_INFO, msg)
def __init__(self):
"""????"""
self.facility = syslog.LOG_USER
self.severity = syslog.LOG_INFO
def __init__(self):
"""
????????
"""
self.facility = syslog.LOG_USER
self.severity = syslog.LOG_INFO
self.message = ""
def _check_arguments(self, check_invalid_arguments, spec=None, param=None, legal_inputs=None):
self._syslog_facility = 'LOG_USER'
unsupported_parameters = set()
if spec is None:
spec = self.argument_spec
if param is None:
param = self.params
if legal_inputs is None:
legal_inputs = self._legal_inputs
for (k, v) in list(param.items()):
if k == '_ansible_check_mode' and v:
self.check_mode = True
elif k == '_ansible_no_log':
self.no_log = self.boolean(v)
elif k == '_ansible_debug':
self._debug = self.boolean(v)
elif k == '_ansible_diff':
self._diff = self.boolean(v)
elif k == '_ansible_verbosity':
self._verbosity = v
elif k == '_ansible_selinux_special_fs':
self._selinux_special_fs = v
elif k == '_ansible_syslog_facility':
self._syslog_facility = v
elif k == '_ansible_version':
self.ansible_version = v
elif k == '_ansible_module_name':
self._name = v
elif k == '_ansible_socket':
self._socket_path = v
elif check_invalid_arguments and k not in legal_inputs:
unsupported_parameters.add(k)
# clean up internal params:
if k.startswith('_ansible_'):
del self.params[k]
if unsupported_parameters:
msg = "Unsupported parameters for (%s) module: %s" % (self._name, ','.join(sorted(list(unsupported_parameters))))
if self._options_context:
msg += " found in %s." % " -> ".join(self._options_context)
msg += " Supported parameters include: %s" % (','.join(sorted(spec.keys())))
self.fail_json(msg=msg)
if self.check_mode and not self.supports_check_mode:
self.exit_json(skipped=True, msg="remote module (%s) does not support check mode" % self._name)