def __ImportFrom(module_xname, xname):
"""
Import a specified name from a python module into the global namespace. This can be accomplished
programatically and inside a method or class.
@param module_xname : name of the module to load x.y.z
@param xname : symbol to import from loaded module
@return method/class : imported item from loaded module
"""
try:
module = __import__(module_xname,globals(),locals(), [xname])
except ImportError:
return None
try:
module = getattr(module,xname)
if xname:
for x in range( len(inspect.stack()) ):
inspect.currentframe(x).f_globals[xname]=module
except AttributeError as e:
module=None
return module
python类stack()的实例源码
def stat_daily_site(self, start=None, end=None):
"""
Extracting statistics per site from start to end (unixtimestamps)
:param start:
:param end:
:return:
"""
unixend = int(time.time()) if end is None else end
unixstart = (unixend - 52 * 7 * 24 * 3600) if start is None else start
content = self.sitecmdjson('/stat/report/daily.site', {
'attrs': ['bytes', 'wan-tx_bytes', 'wan-rx_bytes', 'wlan_bytes', 'num_sta', 'lan-num_sta', 'wlan-num_sta',
'time'],
'start': unixstart,
'end': unixend
})
return self.response(content, inspect.stack()[0].function, 'Daily Statistics for site')
def stat_hourly_ap(self, start=None, end=None):
"""
Extracting statistics per site from start to end (unixtimestamps)
:param start:
:param end:
:return:
"""
unixend = int(time.time()) if end is None else end
unixstart = (unixend - 7 * 24 * 3600) if start is None else start
content = self.sitecmdjson('/stat/report/hourly.ap', {
'attrs': ['bytes', 'num_sta', 'time'],
'start': unixstart,
'end': unixend
})
return self.response(content, inspect.stack()[0].function, 'Hourly Statistics for AP')
def set_ap_radiosettings(self, ap_id, radio='ng', channel=1, ht='20', tx_power_mode=0, tx_power=0):
"""
Set AP settings
:param ap_id:
:param radio:
:param channel:
:param ht:
:param tx_power_mode:
:param tx_power:
:return:
"""
content = self.sitecmdjson('/upd/device/' + urllib.parse.quote(ap_id), {
'radio': radio,
'channel': channel,
'ht': ht,
'tx_power_mode': tx_power_mode,
'tx_power': tx_power
})
return self.response(content, inspect.stack()[0].function, 'AP Radio Settings')
def set_ap_network(self, ap_id, type="dhcp", ip="192.168.1.6", netmask="255.255.255.0", gateway="192.168.1.1", dns1="8.8.8.8", dns2="8.8.4.4"):
"""
Configure network
:param ap_id:
:param type:
:param ip:
:param netmask:
:param gateway:
:param dns1:
:param dns2:
:return:
"""
content = self.sitecmdjson('/rest/device/' + str(ap_id), {
"config_network": [
{
"type": type,
"ip": ip,
"netmask": netmask,
"gateway": gateway,
"dns1": dns1,
"dns2": dns2
}
]
}, method='PUT')
return self.response(content, inspect.stack()[0].function, 'AP Network Config')
def add_hotspot2(self, name, network_access_internet=True, network_type=2, venue_group=2, venue_type=0):
"""
Add HotSpot 2.0, simple settings
:param name:
:param network_access_internet:
:param network_type:
:param venue_group:
:param venue_type:
:return:
"""
content = self.sitecmdjson('/rest/hotspot2conf', {
"name": name,
"network_access_internet": network_access_internet,
"network_type": network_type,
"venue_group": venue_group,
"venue_type": venue_type
})
return self.response(content, inspect.stack()[0].function, 'Add Hotspot 2.0')
def set_hotspot2(self, hs_id, name=None, network_access_internet=None, network_type=None, venue_group=None, venue_type=None):
"""
Modify Hotspot 2.0
:param hs_id:
:param name:
:param network_access_internet:
:param network_type:
:param venue_group:
:param venue_type:
:return:
"""
content = self.sitecmdjson('/rest/hotspot2conf/'+str(hs_id), {
"_id": hs_id,
"name": name,
"network_access_internet": network_access_internet,
"network_type": network_type,
"venue_group": venue_group,
"venue_type": venue_type
}, method="PUT")
return self.response(content, inspect.stack()[0].function, 'Modify Hotspot 2.0 sites')
def log_assert(bool_, message="", logger=None, logger_name="", verbose=False):
"""Use this as a replacement for assert if you want the failing of the
assert statement to be logged."""
if logger is None:
logger = logging.getLogger(logger_name)
try:
assert bool_, message
except AssertionError:
# construct an exception message from the code of the calling frame
last_stackframe = inspect.stack()[-2]
source_file, line_no, func = last_stackframe[1:4]
source = "Traceback (most recent call last):\n" + \
' File "%s", line %s, in %s\n ' % (source_file, line_no, func)
if verbose:
# include more lines than that where the statement was made
source_code = open(source_file).readlines()
source += "".join(source_code[line_no - 3:line_no + 1])
else:
source += last_stackframe[-2][0].strip()
logger.debug("%s\n%s" % (message, source))
raise AssertionError("%s\n%s" % (message, source))
def psome(obj, count):
"""
The obj will printed only a number of count times.
"""
global d
key = inspect.stack()[1][1]+':'+str(inspect.stack()[1][2])
d[key] = d.get(key,count) -1
if d[key]>=0:
print(obj)
# From test.py
def parseFile( self, file_or_filename, parseAll=False ):
"""Execute the parse expression on the given file or filename.
If a filename is specified (instead of a file object),
the entire file is opened, read, and closed before parsing.
"""
try:
file_contents = file_or_filename.read()
except AttributeError:
f = open(file_or_filename, "r")
file_contents = f.read()
f.close()
try:
return self.parseString(file_contents, parseAll)
except ParseBaseException as exc:
if ParserElement.verbose_stacktrace:
raise
else:
# catch and re-raise exception from here, clears out pyparsing internal stack trace
raise exc
def filter(self, record):
# skip this function
stack = inspect.stack()[1:]
for i, (frame, path, ln, func, line, xx) in enumerate(stack):
if (frame.f_globals.get('__name__') == 'pykit.ututil'
and func == 'dd'):
# this frame is dd(), find the caller
_, path, ln, func, line, xx = stack[i + 1]
record._fn = os.path.basename(path)
record._ln = ln
record._func = func
return True
record._fn = record.filename
record._ln = record.lineno
record._func = record.funcName
return True
def _find_frame_by_self(clz):
"""
Find the first frame on stack in which there is local variable 'self' of
type clz.
"""
frame = inspect.currentframe()
while frame:
self = frame.f_locals.get('self')
if isinstance(self, clz):
return frame
frame = frame.f_back
return None
def patch(self, orig, replace):
if not hasattr(self, '__patches'):
self.__patches = []
self.addCleanup(unpatch, self)
f = inspect.stack()[1][0]
parts = orig.split('.')
v = f.f_globals.copy()
v.update(f.f_locals)
orig = v[parts[0]]
for part in parts[1:-1]:
orig = getattr(orig, part)
to_replace = getattr(orig, parts[-1])
self.__patches.append((orig, parts[-1], to_replace))
setattr(orig, parts[-1], replace)
def parseFile( self, file_or_filename, parseAll=False ):
"""Execute the parse expression on the given file or filename.
If a filename is specified (instead of a file object),
the entire file is opened, read, and closed before parsing.
"""
try:
file_contents = file_or_filename.read()
except AttributeError:
f = open(file_or_filename, "r")
file_contents = f.read()
f.close()
try:
return self.parseString(file_contents, parseAll)
except ParseBaseException as exc:
if ParserElement.verbose_stacktrace:
raise
else:
# catch and re-raise exception from here, clears out pyparsing internal stack trace
raise exc
def debug(self, message):
if not self.opts['_debug']:
return
frm = inspect.stack()[1]
mod = inspect.getmodule(frm[0])
if mod is None:
modName = "Unknown"
else:
modName = mod.__name__
if self.dbh is None:
print '[' + modName + '] ' + message
else:
self._dblog("DEBUG", message, modName)
return
def goto(self, rTarget, **kwargs):
if callable(rTarget) is False:
rTarget = self.registry[rTarget]
if( "payload" in kwargs ):
self.tEvt['payload'] = kwargs['payload']
"""Transition to a new state (Cached Version)"""
caller = inspect.stack()[1][3] # hsm could be calling its super states
# we aren't tracking this in this, so we can
# just reference the call stack to find who
# discovered the event
#print "\nCurrent state: ", self.rCurr; print "Source state: ", self.rSource
self.__spy__(caller, self.tEvt['event'], self.tEvt['payload'])
# this is how you see the target
if self.rCurr['toLcaCache'].get(self.tEvt['event'], None) == None:
self.rCurr['toLcaCache'][self.tEvt['event']] = \
self.to_lca(self.hsm_dict[rTarget])
# self.pp.pprint(self.rCurr); print
self.exit(self.rCurr['toLcaCache'][self.tEvt['event']])
self.rNext = self.hsm_dict[rTarget]
def check_class():
"""
Return the class name for the current frame.
If the result is ** None ** means that the call is made from a module.
"""
# get frames
frames = inspect.stack()
cls = None
# should be the third frame
# 0: this function
# 1: function/decorator
# 2: class that contains the function
if len(frames) > 2:
frame = frames[2][0]
if '__module__' in frame.f_code.co_names:
cls = SillyClass(**frame.f_locals)
cls.__cls_name__ = frame.f_code.co_name
return cls
def debug(fn):
"""
Function/method decorator to trace calls via debug logging.
Is a pass-thru if we're not at LOG_LEVEL=DEBUG. Normally this
would have a lot of perf impact but this application doesn't
have significant throughput.
"""
@wraps(fn)
def wrapper(*args, **kwargs):
try:
# because we have concurrent processes running we want
# to tag each stack with an identifier for that process
arg = "[{}]".format(sys.argv[1])
except IndexError:
arg = "[pre_start]"
name = '{}{}{}'.format(arg, (len(inspect.stack()) * " "), fn.__name__)
log.debug('%s' % name)
out = apply(fn, args, kwargs)
log.debug('%s: %s', name, out)
return out
return wrapper
def test_5000_preflight_default_vhost_absolute(self):
"""Check that the default vhost works and can be identified
"""
self.real_test = "{0}_{1}".format(inspect.stack()[0][3],
self.tested_char_name)
self.setGravity(self.GRAVITY_MINOR)
if Register.hasFlag('abs_default_vhost', default=False):
self.skipTest("Preflight test already done.")
self.req.set_method('GET')
target_host = self.config.get('SERVER_HOST')
self.req.host = self.config.get('SERVER_HOST')
self.req.location = 'http://{0}{1}'.format(
target_host, self.req.location)
self._end_almost_regular_query(regular_expected=True)
Register.flag('abs_default_vhost', self.status == self.STATUS_ACCEPTED)
self.assertIn(self.status,
[self.STATUS_ACCEPTED],
'Bad response status {0}'.format(self.status))
def test_5002_preflight_non_default_vhost(self):
"""Check that the non default vhost works and can be identified
"""
# FIXME: on preflight: detect bad proxy configuration with
# empty echo query. Means the 2nd vhost proxy is not working
# TODO: do this test also on first proxy and test_regular
self.real_test = "{0}_{1}".format(inspect.stack()[0][3],
self.tested_char_name)
if not self.config.getboolean('MULTIPLE_HOSTS_TESTS'):
self.skipTest("Tests with multiple virtualhosts are not activated"
" in configuration.")
if Register.hasFlag('non_default_vhost', default=False):
self.skipTest("Preflight test already done.")
self.setGravity(self.GRAVITY_MINOR)
self.req.set_method('GET')
self.req.host = self.config.get('SERVER_NON_DEFAULT_HOST')
self.req.location = self.get_non_default_location(
with_prefix=self.use_backend_location)
self._end_almost_regular_query(regular_expected=True)
Register.flag('non_default_vhost',
self.status == self.STATUS_ACCEPTED)
self.assertIn(self.status,
[self.STATUS_ACCEPTED],
'Bad response status {0}'.format(self.status))
def test_0005_bad_pipeline(self):
"Chain 3 queries, second is bad, should stop the response stream."
self.real_test = "{0}".format(inspect.stack()[0][3])
self._prepare_pipe_test()
self.req2.set_location(Tools.NULL,
random=True)
self.req3 = Request(id(self))
self.req3.set_location(self.config.get('SERVER_DEFAULT_LOCATION'),
random=True)
with Client() as csock:
csock.send(self.req1)
csock.send(self.req2)
csock.send(self.req3)
responses = csock.read_all()
outmsg(str(responses))
self.analysis(responses,
expected_number=2,
regular_expected=False)
self.assertTrue((responses.count <= 2))
if (responses.count > 1):
self.assertIn(self.status,
[self.STATUS_REJECTED, self.STATUS_ERR400],
'Bad response status {0}'.format(self.status))
def test_2000_preflight_regular_chunked_get(self):
"""Let's start by a regular GET with chunked body
"""
self.real_test = "{0}".format(inspect.stack()[0][3])
self.setGravity(self.GRAVITY_MINOR)
self.req.set_method('GET')
self.req.add_header('Transfer-Encoding', 'chunked')
self.req.add_header('Content-Type',
'application/x-www-form-urlencoded')
self.req.add_chunk('Hello')
self.req.add_chunk('World')
self._end_almost_regular_query()
Register.flag('get_chunk_{0}'.format(self.reverse_proxy_mode),
(self.status == self.STATUS_ACCEPTED))
self.assertIn(self.status,
[self.STATUS_ACCEPTED, self.STATUS_ERR405],
'Bad response status {0}'.format(self.status))
def test_2001_preflight_regular_chunked_post(self):
"""If get is not good, try POST for chunked queries.
"""
self.real_test = "{0}".format(inspect.stack()[0][3])
self.setGravity(self.GRAVITY_MINOR)
self.req.set_method('POST')
self.req.add_header('Transfer-Encoding', 'chunked')
self.req.add_header('Content-Type',
'application/x-www-form-urlencoded')
self.req.add_chunk('Hello')
self.req.add_chunk('World')
self._end_almost_regular_query()
Register.flag('post_chunk_{0}'.format(self.reverse_proxy_mode),
(self.status == self.STATUS_ACCEPTED))
self.assertIn(self.status,
[self.STATUS_ACCEPTED, self.STATUS_ERR405],
'Bad response status {0}'.format(self.status))
def test_2020_bad_chunked_transfer_encoding(self):
"""chunk not used as last marker in Transfer-Encoding header.
"""
self.real_test = "{0}".format(inspect.stack()[0][3])
self.setGravity(self.GRAVITY_MINOR)
method = self._get_valid_chunk_method()
self.req.method = method
# chunk MUST be the last marker
self.req.add_header('Transfer-Encoding', 'chunked, zorg')
self.req.add_header('Content-Type',
'application/x-www-form-urlencoded')
self.req.add_chunk('Hello')
self.req.add_chunk('World')
self._end_expected_error()
def test_3013_line_prefix(self):
"Some characters before the query..."
self.real_test = "{0}_{1}".format(
inspect.stack()[0][3],
Tools.show_chars(self.separator))
self.setGravity(BaseTest.GRAVITY_MINOR)
self.req.set_first_line_prefix(self.separator)
# for RP mode:
self.transmission_map = {
'{0}GET'.format(
self.separator): self.STATUS_TRANSMITTED_EXACT,
' GET': self.STATUS_TRANSMITTED_CRAP,
}
self._add_default_status_map(
valid=self.valid_prefix,
always_allow_rejected=self.can_be_rejected
)
# local changes
self.status_map[self.STATUS_TRANSMITTED_EXACT] = self.GRAVITY_MINOR
self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_MINOR
self._end_1st_line_query()
def test_3014_line_suffix(self):
"Let's add some garbage after the protocol."
self.real_test = "{0}_{1}".format(
inspect.stack()[0][3],
Tools.show_chars(self.separator))
self.setGravity(BaseTest.GRAVITY_WARNING)
self.req.set_first_line_suffix(self.separator)
# for RP mode:
self.transmission_map = {
' HTTP/1.0{0}\r\n'.format(
self.separator): self.STATUS_TRANSMITTED_EXACT,
' HTTP/1.1{0}\r\n'.format(
self.separator): self.STATUS_TRANSMITTED_EXACT,
' HTTP/1.1 \r\n': self.STATUS_TRANSMITTED_CRAP,
}
self._add_default_status_map(
valid=False)
self._end_1st_line_query(http09_allowed=False)
def test_3016_line_suffix_with_char_H(self):
"Nginx, for example, likes the H character"
self.real_test = "{0}_{1}".format(
inspect.stack()[0][3],
Tools.show_chars(self.separator))
self.setGravity(BaseTest.GRAVITY_WARNING)
self.req.set_first_line_suffix(self.separator + u'H')
# for RP mode:
self.transmission_map = {
' HTTP/1.0{0}H\r\n'.format(
self.separator): self.STATUS_TRANSMITTED_EXACT,
' HTTP/1.1{0}H\r\n'.format(
self.separator): self.STATUS_TRANSMITTED_EXACT,
'HTTP/1.1{0}H H'.format(
self.separator): self.STATUS_TRANSMITTED_CRAP,
'HTTP/1.1 H\r\n': self.STATUS_TRANSMITTED_CRAP,
}
self._add_default_status_map(
valid=False)
self._end_1st_line_query(http09_allowed=False)
def test_3017_line_suffix_with_double_HTTP11(self):
"Ending first line with two times the protocol"
self.real_test = "{0}_{1}".format(
inspect.stack()[0][3],
Tools.show_chars(self.separator))
self.req.set_first_line_suffix(self.separator + u'HTTP/1.1')
# for RP mode:
self.transmission_map = {
'HTTP/1.1{0}HTTP/1.1\r\n'.format(
self.separator): self.STATUS_TRANSMITTED_EXACT,
'HTTP/1.0{0}HTTP/1.0\r\n'.format(
self.separator): self.STATUS_TRANSMITTED_CRAP,
'HTTP/1.0{0}HTTP/1.1\r\n'.format(
self.separator): self.STATUS_TRANSMITTED_CRAP,
'HTTP/1.1{0}HTTP/1.0\r\n'.format(
self.separator): self.STATUS_TRANSMITTED_CRAP,
}
self._add_default_status_map(
valid=False)
self._end_1st_line_query(http09_allowed=False)
def test_3020_multiple_cr_line_prefix(self):
# TODO: this is maybe allowed, but we do not have any LF here...
self.separator = u'\r\r\r\r\r'
self.real_test = "{0}_{1}".format(
inspect.stack()[0][3],
Tools.show_chars(self.separator))
self.setGravity(BaseTest.GRAVITY_MINOR)
self.req.set_first_line_prefix(self.separator)
# for RP mode:
self.transmission_map = {
'{0}GET'.format(
self.separator): self.STATUS_TRANSMITTED_EXACT,
' GET': self.STATUS_TRANSMITTED_CRAP,
}
self._add_default_status_map(
valid=False,
always_allow_rejected=True)
self._end_1st_line_query(http09_allowed=False)
def test_3021_crlf_line_prefix(self):
self.separator = u'\r\n'
self.real_test = "{0}_{1}".format(
inspect.stack()[0][3],
Tools.show_chars(self.separator))
self.setGravity(BaseTest.GRAVITY_MINOR)
self.req.set_first_line_prefix(self.separator)
# for RP mode:
self.transmission_map = {
'{0}GET'.format(
self.separator): self.STATUS_TRANSMITTED_EXACT,
' GET': self.STATUS_TRANSMITTED_CRAP,
}
self._add_default_status_map(
valid=True,
always_allow_rejected=False) # this is allowed in RFC
self._end_1st_line_query(http09_allowed=False)