def test_3023_crcrcrlf_line_prefix(self):
self.separator = u'\r\r\r\r\r\n'
self.real_test = "{0}_{1}".format(
inspect.stack()[0][3],
Tools.show_chars(self.separator))
self.setGravity(BaseTest.GRAVITY_MINOR)
self.req.set_first_line_prefix(self.separator)
# for RP mode:
self.transmission_map = {
'{0}GET'.format(
self.separator): self.STATUS_TRANSMITTED_EXACT,
' GET': self.STATUS_TRANSMITTED_CRAP,
}
self._add_default_status_map(
valid=True,
always_allow_rejected=True)
self._end_1st_line_query(http09_allowed=False)
python类stack()的实例源码
def test_3040_http_65536_9(self):
"Integer overflow attempt on protocol version."
self.real_test = "{0}".format(inspect.stack()[0][3])
self.req.set_major_version(65536)
self.req.set_minor_version(9)
self.req.add_argument('last', 'marker')
# for RP mode:
self.transmission_map = {
' HTTP/65536.9\r\n': self.STATUS_TRANSMITTED_EXACT,
'HTTP/65536.9 H': self.STATUS_TRANSMITTED_CRAP,
'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP,
}
self._add_default_status_map(valid=False)
# local changes
self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL
self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL
self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL
self._end_1st_line_query(http09_allowed=False)
def test_3041_http_0_9_explicit(self):
"HTTP/0.9 does not exists. Should be an empty protocol."
self.real_test = "{0}".format(inspect.stack()[0][3])
self.req.set_major_version(0)
self.req.set_minor_version(9)
self.req.add_argument('last', 'marker')
# for RP mode:
self.transmission_map = {
' HTTP/0.9\r\n': self.STATUS_TRANSMITTED_EXACT,
'HTTP/0.9 H': self.STATUS_TRANSMITTED_CRAP,
'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP,
}
self._add_default_status_map(valid=False)
# local changes
self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL
self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL
self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL
self._end_1st_line_query(http09_allowed=False)
def test_3043_http_minus_1_0(self):
"HTTP/-1.0 does not exists."
self.real_test = "{0}".format(inspect.stack()[0][3])
self.req.set_major_version(-1)
self.req.set_minor_version(0)
self.req.add_argument('last', 'marker')
# for RP mode:
self.transmission_map = {
' HTTP/-1.0\r\n': self.STATUS_TRANSMITTED_EXACT,
'HTTP/-1.0 H': self.STATUS_TRANSMITTED_CRAP,
'HTTP/-1.1': self.STATUS_TRANSMITTED_CRAP,
'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP,
}
self._add_default_status_map(valid=False)
# local changes
self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL
self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL
self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL
self._end_1st_line_query(http09_allowed=False)
def test_3044_http_1_nodigit(self):
"HTTP/1.x1 does not exists."
self.real_test = "{0}".format(inspect.stack()[0][3])
self.req.set_major_version(1)
self.req.set_minor_version('x1')
self.req.add_argument('last', 'marker')
# for RP mode:
self.transmission_map = {
' HTTP/1.x1\r\n': self.STATUS_TRANSMITTED_EXACT,
'HTTP/1.0 H': self.STATUS_TRANSMITTED_CRAP,
'HTTP/1.1 H': self.STATUS_TRANSMITTED_CRAP,
'HTTP/1.x1 H': self.STATUS_TRANSMITTED_CRAP,
'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP,
}
self._add_default_status_map(valid=False)
# local changes
self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL
self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL
self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL
self._end_1st_line_query(http09_allowed=False)
def test_3045_http_version_too_much_digits(self):
"New RFC is quite strict, DIGIT.DIGIT only, in fact."
self.real_test = "{0}".format(inspect.stack()[0][3])
self.req.set_major_version(11)
self.req.set_minor_version(11)
self.req.add_argument('last', 'marker')
# for RP mode:
self.transmission_map = {
' HTTP/11.11\r\n': self.STATUS_TRANSMITTED_EXACT,
'HTTP/11.0': self.STATUS_TRANSMITTED_CRAP,
'.11 H': self.STATUS_TRANSMITTED_CRAP,
'last=marker\r\n': self.STATUS_TRANSMITTED_CRAP,
}
self._add_default_status_map(valid=False)
# local changes
self.status_map[self.STATUS_TRANSMITTED] = self.GRAVITY_CRITICAL
self.status_map[self.STATUS_09OK] = self.GRAVITY_CRITICAL
self.status_map[self.STATUS_09DOWNGRADE] = self.GRAVITY_CRITICAL
self._end_1st_line_query(http09_allowed=False)
def _get(self, database_type, ip_address):
if not database_type in self.metadata().database_type:
caller = inspect.stack()[2][3]
raise TypeError("The %s method cannot be used with the "
"%s database" %
(caller, self.metadata().database_type))
record = self._db_reader.get(ip_address)
if record is None:
raise geoip2.errors.AddressNotFoundError(
"The address %s is not in the database." % ip_address)
return record
def __ImportModule( module_name, asName=None ):
"""
Import a python module as needed into the global namespace. This can be accomplished
programatically and inside a method or class.
@param module_name : name of the module to load x.y.z
@return module : python module object that was loaded
"""
module = __import__(module_name)
for layer in module_name.split('.')[1:]:
module = getattr(module,layer)
if asName:
for x in range( len(inspect.stack()) ):
inspect.currentframe(x).f_globals[asName]=module
return module
def __aenter__(self):
model_nonce = uuid.uuid4().hex[-4:]
frame = inspect.stack()[1]
test_name = frame.function.replace('_', '-')
jujudata = TestJujuData()
self._controller = Controller(jujudata=jujudata)
controller_name = jujudata.current_controller()
user_name = jujudata.accounts()[controller_name]['user']
await self._controller.connect(controller_name)
model_name = 'test-{}-{}-{}'.format(
test_run_nonce,
test_name,
model_nonce,
)
self._model = await self._controller.add_model(model_name)
# Change the JujuData instance so that it will return the new
# model as the current model name, so that we'll connect
# to it by default.
jujudata.set_model(
controller_name,
user_name + "/" + model_name,
self._model.info.uuid,
)
# save the model UUID in case test closes model
self._model_uuid = self._model.info.uuid
return self._model
def get_current_func(self):
'''
:return: ???????
'''
return inspect.stack()[1][3]
def __repr__(self):
if inspect.stack()[1][3] == '?':
self()
return ''
return '<pydoc.Helper instance>'
def get_stack_path(i=0, depth=None):
'''Inspect Stack to get a relative name for this code point
'''
stk = inspect.stack()[(2 + i):depth]
loc = []
for sf in reversed(stk):
_, fn = os.path.split(sf.filename)
f = sf.function
loc.append('[{}]{}'.format(fn, f))
return ':'.join(loc)
def __init__(self, name, *values, **options):
"""Create a new rule definition. Simply instantiating a new rule definition
will add it to the current ``GramFuzzer`` instance.
:param str name: The name of the rule being defined
:param list values: The list of values that define the value of the rule
(will be concatenated when built)
:param str cat: The category to create the rule in (default=``"default"``).
:param bool no_prune: If this rule should not be pruned *EVEN IF* it is found to be
unreachable (default=``False``)
"""
self.name = name
self.options = options
self.values = list(values)
self.sep = self.options.setdefault("sep", self.sep)
self.cat = self.options.setdefault("cat", self.cat)
self.no_prune = self.options.setdefault("no_prune", self.no_prune)
self.fuzzer = GramFuzzer.instance()
frame,mod_path,_,_,_,_ = inspect.stack()[1]
module_name = os.path.basename(mod_path).replace(".pyc", "").replace(".py", "")
if "TOP_CAT" in frame.f_locals:
self.fuzzer.cat_group_defaults[module_name] = frame.f_locals["TOP_CAT"]
self.fuzzer.add_definition(self.cat, self.name, self, no_prune=self.no_prune, gram_file=module_name)
def get_program_name():
return os.path.basename(inspect.stack()[-1][1])
def get_test_path(filename):
frame = inspect.stack()[1]
module = inspect.getmodule(frame[0])
path = os.path.realpath(module.__file__)
root = os.path.abspath(os.path.join(path, os.pardir))
return os.path.join(root, 'test_data', filename)
def __init__(self,
name=None,
router=None,
load_env=True,
log_config=LOGGING):
if log_config:
logging.config.dictConfig(log_config)
# Only set up a default log handler if the
# end-user application didn't set anything up.
if not (logging.root.handlers and
log.level == logging.NOTSET and
log_config):
formatter = logging.Formatter(
"%(asctime)s: %(levelname)s: %(message)s")
handler = logging.StreamHandler()
handler.setFormatter(formatter)
log.addHandler(handler)
log.setLevel(logging.INFO)
# Get name from previous stack frame
if name is None:
frame_records = stack()[1]
name = getmodulename(frame_records[1])
self.name = name
self.config = Config(load_env=load_env)
self.router = router or PathRouter()
self.debug = None
self.static_handler = None
def authorize_guest(self, mac, minutes=60, up=None, down=None, mbytes=None, apmac=None):
"""
Authorize one guest
:param mac: the mac of the guest
:param minutes: for how many minutes it is authorized
:param up: upstream bandwidth in kbits
:param down: downstream bandwidth in kbits
:param mbytes: mbytes limit in kbits
:param apmac: AP mac address (faster performance)
:return:
"""
data = {
'mac': mac.lower(),
'cmd': 'authorize-guest',
'minutes': minutes
}
if up:
data['up'] = up
if down:
data['down'] = down
if mbytes:
data['bytes'] = mbytes
if apmac:
data['ap_mac'] = apmac.lower()
content = self.sitecmdjson('/cmd/stamgr', data)
return self.response(content, inspect.stack()[0].function, 'Guest Authorization')
def unauthorize_guest(self, mac):
"""
Unauthorize one guest
:param mac:
:return:
"""
content = self.sitecmdjson('/cmd/stamgr', {
'cmd': 'unauthorize-guest',
'mac': mac.lower()
})
return self.response(content, inspect.stack()[0].function, 'Guest Unuthorization')
def kick_sta(self, mac):
"""
Reconnect client device
:param mac:
:return:
"""
content = self.sitecmdjson('/cmd/stamgr', {
'cmd': 'kick-sta',
'mac': mac.lower()
})
return self.response(content, inspect.stack()[0].function, 'Kicking Station')
def block_sta(self, mac):
"""
Blocking client device
:param mac:
:return:
"""
content = self.sitecmdjson('/cmd/stamgr', {
'cmd': 'block-sta',
'mac': mac.lower()
})
return self.response(content, inspect.stack()[0].function, 'Blocking Station')