def handleHeader(self, key, value):
logging.log(self.getLogLevel(), "Got server header: %s:%s" % (key, value))
if (key.lower() == 'location'):
value = self.replaceSecureLinks(value)
if (key.lower() == 'content-type'):
if (value.find('image') != -1):
self.isImageRequest = True
logging.debug("Response is image content, not scanning...")
if (key.lower() == 'content-encoding'):
if (value.find('gzip') != -1):
logging.debug("Response is compressed...")
self.isCompressed = True
elif (key.lower() == 'content-length'):
self.contentLength = value
elif (key.lower() == 'set-cookie'):
self.client.responseHeaders.addRawHeader(key, value)
else:
self.client.setHeader(key, value)
python类log()的实例源码
def handleHeader(self, key, value):
logging.log(self.getLogLevel(), "Got server header: %s:%s" % (key, value))
if (key.lower() == 'location'):
value = self.replaceSecureLinks(value)
if (key.lower() == 'content-type'):
if (value.find('image') != -1):
self.isImageRequest = True
logging.debug("Response is image content, not scanning...")
if (key.lower() == 'content-encoding'):
if (value.find('gzip') != -1):
logging.debug("Response is compressed...")
self.isCompressed = True
elif (key.lower() == 'content-length'):
self.contentLength = value
elif (key.lower() == 'set-cookie'):
self.client.responseHeaders.addRawHeader(key, value)
elif (key.lower()== 'strict-transport-security'):
logging.log(self.getLogLevel(), "LEO Erasing Strict Transport Security....")
else:
self.client.setHeader(key, value)
def handleHeader(self, key, value):
logging.log(self.getLogLevel(), "Got server header: %s:%s" % (key, value))
if (key.lower() == 'location'):
value = self.replaceSecureLinks(value)
if (key.lower() == 'content-type'):
if (value.find('image') != -1):
self.isImageRequest = True
logging.debug("Response is image content, not scanning...")
if (key.lower() == 'content-encoding'):
if (value.find('gzip') != -1):
logging.debug("Response is compressed...")
self.isCompressed = True
elif (key.lower() == 'content-length'):
self.contentLength = value
elif (key.lower() == 'set-cookie'):
self.client.responseHeaders.addRawHeader(key, value)
elif (key.lower()== 'strict-transport-security'):
logging.log(self.getLogLevel(), "LEO Erasing Strict Transport Security....")
else:
self.client.setHeader(key, value)
def handleHeader(self, key, value):
logging.log(self.getLogLevel(), "Got server header: %s:%s" % (key, value))
if (key.lower() == 'location'):
value = self.replaceSecureLinks(value)
if (key.lower() == 'content-type'):
if (value.find('image') != -1):
self.isImageRequest = True
logging.debug("Response is image content, not scanning...")
if (key.lower() == 'content-encoding'):
if (value.find('gzip') != -1):
logging.debug("Response is compressed...")
self.isCompressed = True
elif (key.lower() == 'content-length'):
self.contentLength = value
elif (key.lower() == 'set-cookie'):
self.client.responseHeaders.addRawHeader(key, value)
elif (key.lower()== 'strict-transport-security'):
logging.log(self.getLogLevel(), "LEO Erasing Strict Transport Security....")
else:
self.client.setHeader(key, value)
def handleHeader(self, key, value):
logging.log(self.getLogLevel(), "Got server header: %s:%s" % (key, value))
if (key.lower() == 'location'):
value = self.replaceSecureLinks(value)
if (key.lower() == 'content-type'):
if (value.find('image') != -1):
self.isImageRequest = True
logging.debug("Response is image content, not scanning...")
if (key.lower() == 'content-encoding'):
if (value.find('gzip') != -1):
logging.debug("Response is compressed...")
self.isCompressed = True
elif (key.lower() == 'content-length'):
self.contentLength = value
elif (key.lower() == 'set-cookie'):
self.client.responseHeaders.addRawHeader(key, value)
else:
self.client.setHeader(key, value)
def handleHeader(self, key, value):
logging.log(self.getLogLevel(), "Got server header: %s:%s" % (key, value))
if (key.lower() == 'location'):
value = self.replaceSecureLinks(value)
if (key.lower() == 'content-type'):
if (value.find('image') != -1):
self.isImageRequest = True
logging.debug("Response is image content, not scanning...")
if (key.lower() == 'content-encoding'):
if (value.find('gzip') != -1):
logging.debug("Response is compressed...")
self.isCompressed = True
elif (key.lower() == 'content-length'):
self.contentLength = value
elif (key.lower() == 'set-cookie'):
self.client.responseHeaders.addRawHeader(key, value)
elif (key.lower()== 'strict-transport-security'):
logging.log(self.getLogLevel(), "LEO Erasing Strict Transport Security....")
else:
self.client.setHeader(key, value)
def teardown_integration_test(self):
"""
Stops the broker running in the separate process.
:return:
"""
try:
stop_broker()
if self.broker_process is not None:
self.broker_process.join(timeout=_BROKER_SHUTDOWN_TIMEOUT_TIME)
except Exception as _:
logging.log(msg="Broker didn't shut down. Killing broker process.", level=logging.WARNING)
self.broker_process.terminate()
self.broker_process.join(timeout=_BROKER_SHUTDOWN_TIMEOUT_TIME)
if self.broker_process.is_alive():
logging.log(msg="Broker won't terminate. Integration test exiting.", level=logging.ERROR)
sys.exit(1)
self.broker_process = None
def click(self, level=None):
try:
resp = self.sess.get(self.coupon_url, timeout=5)
if level != None:
soup = bs4.BeautifulSoup(resp.text, "html.parser")
tag1 = soup.select('title')
tag2 = soup.select('div.content')
if len(tag2):
logging.log(level, u'{}'.format(tag2[0].text.strip(' \t\r\n')))
else:
if len(tag1):
logging.log(level, u'{}'.format(tag1[0].text.strip(' \t\r\n')))
else:
logging.log(level, u'????')
except Exception, e:
if level != None:
logging.log(level, 'Exp {0} : {1}'.format(FuncName(), e))
return 0
else:
return 1
def kill_ports(ports):
for port in ports:
log('kill %s start' % port)
popen = subprocess.Popen('lsof -i:%s' % port, shell = True, stdout = subprocess.PIPE)
(data, err) = popen.communicate()
log('data:\n%s \nerr:\n%s' % (data, err))
pattern = re.compile(r'\b\d+\b', re.S)
pids = re.findall(pattern, data)
log('pids:%s' % str(pids))
for pid in pids:
if pid != '' and pid != None:
try:
log('pid:%s' % pid)
popen = subprocess.Popen('kill -9 %s' % pid, shell = True, stdout = subprocess.PIPE)
(data, err) = popen.communicate()
log('data:\n%s \nerr:\n%s' % (data, err))
except Exception, e:
log('kill_ports exception:%s' % e)
log('kill %s finish' % port)
time.sleep(1)
def dir_from_output(output):
"""Get library directory based on the output of clang.
Args:
output (str): raw output from clang
Returns:
str: path to folder with libclang
"""
log.debug("real output: %s", output)
if platform.system() == "Darwin":
# [HACK] uh... I'm not sure why it happens like this...
folder_to_search = path.join(output, '..', '..')
log.debug("folder to search: %s", folder_to_search)
return folder_to_search
elif platform.system() == "Windows":
log.debug("architecture: %s", platform.architecture())
return path.normpath(output)
elif platform.system() == "Linux":
return path.normpath(path.dirname(output))
return None
def test_cont_feature(self, query, feature, epsilon, min_val=None,
max_val=None):
"""
FInd splits on a continuous feature
"""
if min_val is None:
min_val = feature.min_val
if max_val is None:
max_val = feature.max_val
query_max = make_query(query, feature.name, max_val)
max_id = self.predict(query_max)
query_min = make_query(query, feature.name, min_val)
min_id = self.predict(query_min)
logging.log(DEBUG, '\tmin val {} got {}'.format(min_val, min_id))
logging.log(DEBUG, '\tmax val {} got {}'.format(max_val, max_id))
# search for any splitting thresholds
thresholds = sorted(self.line_search(query, feature.name, min_val,
max_val, min_id, max_id, epsilon))
logging.log(DEBUG, '\tthresholds: {}'.format(thresholds))
return thresholds
def test_cat_feature(self, query, feature, categories=None):
"""
Find splits on a categorical feature
"""
if not categories:
categories = feature.vals
# map of a leaf's ID to all the values that lead to it
cat_ids = {}
for val in categories:
# test each value one after the other
query_cat = make_query(query, feature.name, val)
cat_id = self.predict(query_cat)
logging.log(DEBUG, '\t val {} got {}'.format(val, cat_id))
if cat_id in cat_ids:
cat_ids[cat_id].append(val)
else:
cat_ids[cat_id] = [val]
return cat_ids
def merge_all_preds(self, preds):
"""
Attempt to merge predicate paths for a given leaf identity
"""
merged = []
while preds:
pred1 = preds.pop()
found_merge = False
for pred2 in copy(preds):
try:
pred3 = self.merge_preds(pred1, pred2)
logging.log(DEBUG, 'merged to {}'.format(pred3))
if pred3 == pred1 or pred3 == pred2:
logging.log(DEBUG, 'no new merge...')
continue
preds += [pred3]
found_merge = True
except ValueError:
pass
if not found_merge:
merged += [pred1]
return merged
def _trace(msg, *args, **kw):
logging.log(logging.TRACE, msg, *args, **kw)
def trace(self, msg, *args, **kw):
self.log(logging.TRACE, msg, *args, **kw)
def check_call(*args, **kwargs):
logging.log(logging.INFO, "subprocess check_call: %s" % " ".join(*args))
return subprocess.check_call(*args, **kwargs)
def call(*args, **kwargs):
logging.log(logging.INFO, "subprocess call: %s" % " ".join(*args))
return subprocess.call(*args, **kwargs)
def log_info(text):
logging.log(APP_LEVEL, text)
def parse(self, response):
content_info = response.css('div.contentinfo')
symbol1 = u'?'
symbol2 = u'?'
rex = r'%s(.*)%s' % (symbol1, symbol2)
logging.log(logging.INFO, "rex=" + rex)
title = content_info.css('h1 a::text').re_first(rex)
logging.log(logging.INFO, title)
def parse_movie(self, response):
content_info = response.css('div.contentinfo')
movie = Movie()
symbol1 = u'?'
symbol2 = u'?'
rex = r'%s(.*)%s' % (symbol1, symbol2)
movie['title'] = content_info.css('h1 a::text').re_first(rex)
# logging.log(logging.INFO, "parse_movie " + movie['title'])
text = content_info.css('div#text')
t_msg_font = text.css('div.t_msgfont')
if len(t_msg_font) > 0:
movie['cover'] = t_msg_font.css(' img::attr(src)').extract_first()
movie['detail'] = self.parse_detail(t_msg_font.css('::text'))
else:
movie['cover'] = text.css(' p img::attr(src)').extract_first()
movie['detail'] = self.parse_detail(text.css(' p::text'))
thumbnails = text.css(' p img::attr(src)').extract()
if movie['cover'] in thumbnails:
thumbnails.remove(movie['cover'])
movie['thumbnails'] = thumbnails
download_links = text.css(' table tbody tr td a')
download_links_array = []
for link_item in download_links:
download_link = DownloadLink()
download_link['title'] = link_item.css('::text').extract_first()
download_link['link'] = link_item.css('::attr(href)').extract_first();
download_links_array.append(dict(download_link))
movie['download_links'] = download_links_array
return movie