def executeCheck():
for ssid in config['checks']['ssids'].keys():
sanity = checkSSID(config['checks']['ssids'][ssid]['name'], config['checks']['ssids'][ssid]['encrypted'], config, BASE_DIR + '/log')
id = writeCheck(db_conn=db_connection, sanity=sanity, timeouts={'conn' : config['checks']['failed_conn'], 'dhcp' : config['checks']['failed_dhcp']}, location=config['computer']['location'])
try:
file = open(BASE_DIR + '/log/' + ssid + '.tmplog', 'r')
except:
pass
else:
try:
gfile = open(BASE_DIR + '/log/' + ssid + '.log', 'a')
except:
file.close()
else:
gfile.write('==== SSID: ' + ssid + ' == ' + datetime.datetime.fromtimestamp(int(sanity['time_start'])).strftime('%Y-%m-%d %H:%M:%S') + ' == DB ID: ' + str(id) + '\n')
gfile.write(file.read())
gfile.close()
file.close()
print(sanity)
syslog(LOG_INFO, sanity.__str__())
# excecution
python类LOG_INFO的实例源码
def confirm_leave(self, owner, level):
"""
??????owner,level???????facility,severity
"""
# ?????
facility = syslog.LOG_USER
severity = syslog.LOG_INFO
# as??????syslog????
level_info = dict(INFO=syslog.LOG_INFO, WARN=syslog.LOG_WARNING,
ERROR=syslog.LOG_ERR, DEBUG=syslog.LOG_DEBUG)
if level in level_info.keys():
severity = level_info[level]
# ????
if owner in ['ecms_troubleshoot']:
severity = syslog.syslog.LOG_EMERG
return facility, severity
def pam_sm_setcred( pamh, flags, argv ):
""" pam_sm_setcred """
syslog.syslog( syslog.LOG_INFO,
"Please note: pam_linotp does not support setcred" )
return pamh.PAM_CRED_UNAVAIL
def pam_sm_acct_mgmt( pamh, flags, argv ):
""" pam_sm_acct_mgmt """
syslog.syslog( syslog.LOG_INFO,
"Please note: pam_linotp does not support acct_mgmt" )
return pamh.PAM_SERVICE_ERR
def pam_sm_chauthtok( pamh, flags, argv ):
""" pam_sm_chauthtok """
syslog.syslog( syslog.LOG_INFO,
"Please note: pam_linotp does not support chauthtok" )
return pamh.PAM_SERVICE_ERR
def pam_sm_open_session( pamh, flags, argv ):
""" pam_sm_open_session """
syslog.syslog( syslog.LOG_INFO,
"Please note: pam_linotp does not support open_session" )
return pamh.PAM_SERVICE_ERR
def pam_sm_close_session( pamh, flags, argv ):
""" pam_sm_close_session """
syslog.syslog( syslog.LOG_INFO,
"Please note: pam_linotp does not support close_session" )
return pamh.PAM_SERVICE_ERR
##eof##########################################################################
def __init__(self, message_dict={}):
self.message_dict = dict(message_dict[utils.MESSAGE])
self.timestamp = arrow.get(self.message_dict.get("timestamp", None))
self.level = self.message_dict.get("level", syslog.LOG_INFO)
self.message = self.message_dict.get(utils.MESSAGE, "")
def test_format_colored_with_level_info(self):
self.message.level = syslog.LOG_INFO
log = TailFormatter('({source}) - {message}', color=True).format(self.message)
self.assertEquals(colored('(dummy.source) - dummy message', 'green'), log)
def test_get_log_level_from_code(self):
self.assertEquals('CRITICAL', LogLevel.find_by_syslog_code(syslog.LOG_CRIT)['name'])
self.assertEquals('WARNING', LogLevel.find_by_syslog_code(syslog.LOG_WARNING)['name'])
self.assertEquals('DEBUG', LogLevel.find_by_syslog_code(syslog.LOG_DEBUG)['name'])
self.assertEquals('INFO', LogLevel.find_by_syslog_code(syslog.LOG_INFO)['name'])
self.assertEquals('ERROR', LogLevel.find_by_syslog_code(syslog.LOG_ERR)['name'])
self.assertEquals('NOTICE', LogLevel.find_by_syslog_code(syslog.LOG_NOTICE)['name'])
self.assertEquals('', LogLevel.find_by_syslog_code(9999)['name'])
def test_get_log_level_code(self):
self.assertEquals(syslog.LOG_CRIT, LogLevel.find_by_level_name('CRITICAL'))
self.assertEquals(syslog.LOG_WARNING, LogLevel.find_by_level_name('WARNING'))
self.assertEquals(syslog.LOG_DEBUG, LogLevel.find_by_level_name('DEBUG'))
self.assertEquals(syslog.LOG_INFO, LogLevel.find_by_level_name('INFO'))
self.assertEquals(syslog.LOG_ERR, LogLevel.find_by_level_name('ERROR'))
self.assertEquals(syslog.LOG_NOTICE, LogLevel.find_by_level_name('NOTICE'))
self.assertIsNone(LogLevel.find_by_level_name('UNKNOWN'))
def emit(self, record):
priority = self.priority_map.get(record.levelno, syslog.LOG_INFO)
message = self.format(record)
if isinstance(message, unicode):
message = message.encode('UTF-8')
for line in message.rstrip().split('\n'):
syslog.syslog(priority, line)
# noinspection PyMethodMayBeStatic
def loginf(msg):
logmsg(syslog.LOG_INFO, msg)
def loginf(msg):
logmsg(syslog.LOG_INFO, msg)
def as_syslog(self) -> int:
"""Translates the internal library value to its corresponding Syslog value.
:returns: the corresponding Syslog value
"""
if self == LogLevel.debug:
return syslog.LOG_DEBUG
elif self == LogLevel.info:
return syslog.LOG_INFO
elif self == LogLevel.warning:
return syslog.LOG_WARNING
else:
return syslog.LOG_ERR
def test_syslog_eq():
assert LogLevel.debug.as_syslog == syslog.LOG_DEBUG
assert LogLevel.info.as_syslog == syslog.LOG_INFO
assert LogLevel.warning.as_syslog == syslog.LOG_WARNING
assert LogLevel.error.as_syslog == syslog.LOG_ERR
def handle(self):
data = bytes.decode(self.request[0].strip())
#socket = self.request[1]
#print( "%s : " % self.client_address[0], str(data))
#logging.info(str(data))
SyslogUDPHandler.nslogger.log(syslog.LOG_USER, syslog.LOG_INFO, str(data), pid=False)
SyslogUDPHandler.count = SyslogUDPHandler.count + 1
def loginf(msg):
logmsg(syslog.LOG_INFO, msg)
def disconnectWiFi(interface, defaults):
assert type(interface) == type('a')
assert type(defaults) == type({})
proc = subprocess.Popen(['killall', 'wpa_supplicant'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
syslog(LOG_INFO, decodeUTF8(proc.communicate()))
proc = subprocess.Popen(['dhclient', '-r', interface], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
syslog(LOG_INFO, decodeUTF8(proc.communicate()))
proc = subprocess.Popen(['ip', 'link', 'set', interface, 'down'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
syslog(LOG_INFO, decodeUTF8(proc.communicate()))
confDefaultGW(defaults['interface'], defaults['gateway'])
return True
def initializeInterface(interface):
proc = subprocess.Popen(['ip', 'link', 'set', interface, 'up' ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
syslog(LOG_INFO, decodeUTF8(proc.communicate()))
sleep(4)
proc = subprocess.Popen(['iw', 'dev', interface, 'scan' ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
syslog(LOG_INFO, decodeUTF8(proc.communicate()))
return True
def killPID(pid):
assert type(pid) == type(42)
proc = subprocess.Popen(['kill', '-9', str(pid)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
syslog(LOG_INFO, decodeUTF8(proc.communicate()))
return True
def checkIP(gw):
proc = subprocess.Popen(['ip', 'a'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out = proc.communicate()
out = decodeUTF8(out)
syslog(LOG_INFO, out)
if ''.join(['inet ', gw[0:4]]) in out:
return True
else:
return False
def doPingAvr(target, interface, count):
proc = subprocess.Popen(['ping', ''.join(['-c', str(count)]), '-I', interface, target], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out = proc.communicate()
out = decodeUTF8(out)
syslog(LOG_INFO, out)
try:
out = out.split(' = ')[1]
out = out.split(' ms')[0]
out = out.split('/')[1]
except:
return 0
return out
def getBSSID(interface):
proc = subprocess.Popen(['iw', 'dev', interface, 'link'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out = proc.communicate()
out = decodeUTF8(out)
syslog(LOG_INFO, out)
try:
out = out.split('Connected to ')[1]
out = out.split(' (on ')[0]
except IndexError as e:
syslog(LOG_INFO, e.value)
return out
def confDefaultGW(interface, gw):
proc = subprocess.Popen(['ip', 'route', 'del', 'default'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
syslog(LOG_INFO, decodeUTF8(proc.communicate()))
proc = subprocess.Popen(['ip', 'route', 'add', 'default', 'via', gw, 'dev', interface], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
syslog(LOG_INFO, decodeUTF8(proc.communicate()))
return True
def loginf(msg):
logmsg(syslog.LOG_INFO, msg)
def loginf(msg):
logmsg(syslog.LOG_INFO, msg)
def test_emitCustomFacility(self):
"""
L{SyslogObserver.emit} uses the value of the C{'syslogPriority'} as the
syslog priority, if that key is present in the event dictionary.
"""
self.observer.emit({
'message': ('hello, world',), 'isError': False, 'system': '-',
'syslogFacility': stdsyslog.LOG_CRON})
self.assertEqual(
self.events,
[(stdsyslog.LOG_INFO | stdsyslog.LOG_CRON, '[-] hello, world')])
def test_emitCustomSystem(self):
"""
L{SyslogObserver.emit} uses the value of the C{'system'} key to prefix
the logged message.
"""
self.observer.emit({'message': ('hello, world',), 'isError': False,
'system': 'nonDefaultSystem'})
self.assertEqual(
self.events,
[(stdsyslog.LOG_INFO, "[nonDefaultSystem] hello, world")])
def test_emitMessage(self):
"""
L{SyslogObserver.emit} logs the value of the C{'message'} key of the
event dictionary it is passed to the syslog.
"""
self.observer.emit({
'message': ('hello, world',), 'isError': False,
'system': '-'})
self.assertEqual(
self.events,
[(stdsyslog.LOG_INFO, "[-] hello, world")])