def matchreport(self, inamepart="",
names="pytest_runtest_logreport pytest_collectreport", when=None):
""" return a testreport whose dotted import path matches """
l = []
for rep in self.getreports(names=names):
try:
if not when and rep.when != "call" and rep.passed:
# setup/teardown passing reports - let's ignore those
continue
except AttributeError:
pass
if when and getattr(rep, 'when', None) != when:
continue
if not inamepart or inamepart in rep.nodeid.split("::"):
l.append(rep)
if not l:
raise ValueError("could not find test report matching %r: "
"no test reports at all!" % (inamepart,))
if len(l) > 1:
raise ValueError(
"found 2 or more testreports matching %r: %s" %(inamepart, l))
return l[0]
python类test()的实例源码
def makefile(self, ext, *args, **kwargs):
"""Create a new file in the testdir.
ext: The extension the file should use, including the dot.
E.g. ".py".
args: All args will be treated as strings and joined using
newlines. The result will be written as contents to the
file. The name of the file will be based on the test
function requesting this fixture.
E.g. "testdir.makefile('.txt', 'line1', 'line2')"
kwargs: Each keyword is the name of a file, while the value of
it will be written as contents of the file.
E.g. "testdir.makefile('.ini', pytest='[pytest]\naddopts=-rs\n')"
"""
return self._makefile(ext, args, kwargs)
def inline_runsource(self, source, *cmdlineargs):
"""Run a test module in process using ``pytest.main()``.
This run writes "source" into a temporary file and runs
``pytest.main()`` on it, returning a :py:class:`HookRecorder`
instance for the result.
:param source: The source code of the test module.
:param cmdlineargs: Any extra command line arguments to use.
:return: :py:class:`HookRecorder` instance of the result.
"""
p = self.makepyfile(source)
l = list(cmdlineargs) + [p]
return self.inline_run(*l)
def parseconfig(self, *args):
"""Return a new pytest Config instance from given commandline args.
This invokes the pytest bootstrapping code in _pytest.config
to create a new :py:class:`_pytest.core.PluginManager` and
call the pytest_cmdline_parse hook to create new
:py:class:`_pytest.config.Config` instance.
If :py:attr:`plugins` has been populated they should be plugin
modules which will be registered with the PluginManager.
"""
args = self._ensure_basetemp(args)
import _pytest.config
config = _pytest.config._prepareconfig(args, self.plugins)
# we don't know what the test will do with this half-setup config
# object and thus we make sure it gets unconfigured properly in any
# case (otherwise capturing could still be active, for example)
self.request.addfinalizer(config._ensure_unconfigure)
return config
def getitem(self, source, funcname="test_func"):
"""Return the test item for a test function.
This writes the source to a python file and runs pytest's
collection on the resulting module, returning the test item
for the requested function name.
:param source: The module source.
:param funcname: The name of the test function for which the
Item must be returned.
"""
items = self.getitems(source)
for item in items:
if item.name == funcname:
return item
assert 0, "%r item not found in module:\n%s\nitems: %s" %(
funcname, source, items)
def getmodulecol(self, source, configargs=(), withinit=False):
"""Return the module collection node for ``source``.
This writes ``source`` to a file using :py:meth:`makepyfile`
and then runs the pytest collection on it, returning the
collection node for the test module.
:param source: The source code of the module to collect.
:param configargs: Any extra arguments to pass to
:py:meth:`parseconfigure`.
:param withinit: Whether to also write a ``__init__.py`` file
to the temporarly directory to ensure it is a package.
"""
kw = {self.request.function.__name__: Source(source).strip()}
path = self.makepyfile(**kw)
if withinit:
self.makepyfile(__init__ = "#")
self.config = config = self.parseconfigure(path, *configargs)
node = self.getnode(config, path)
return node
def test_alienvault_scanners():
rule.feeds['scanners']['remote'] = 'test/alienvault/feed.txt'
x = s.process(rule, feed="scanners")
x = list(x)
assert len(x) > 0
ips = set()
tags = set()
for xx in x:
ips.add(xx.indicator)
tags.add(xx.tags[0])
assert '114.143.191.19' in ips
assert '180.97.215.63' in ips
assert 'scanner' in tags
def test_alienvault_spammers():
rule.feeds['spammers']['remote'] = 'test/alienvault/feed.txt'
x = s.process(rule, feed="spammers")
x = list(x)
assert len(x) > 0
ips = set()
tags = set()
for xx in x:
ips.add(xx.indicator)
tags.add(xx.tags[0])
assert '23.92.83.73' in ips
assert '93.127.228.36' in ips
assert 'spam' in tags
def test_alienvault_malware():
rule.feeds['malware']['remote'] = 'test/alienvault/feed.txt'
x = s.process(rule, feed="malware")
x = list(x)
assert len(x) > 0
ips = set()
tags = set()
for xx in x:
ips.add(xx.indicator)
tags.add(xx.tags[0])
assert '93.158.211.210' in ips
assert 'malware' in tags
def test_packetmail_iprep():
rule.feeds['iprep']['remote'] = 'test/packetmail/feed.txt'
x = s.process(rule, feed="iprep")
x = list(x)
assert len(x) > 0
ips = set()
tags = set()
for xx in x:
ips.add(xx.indicator)
for t in xx.tags:
tags.add(t)
assert '179.40.212.141' in ips
assert '104.131.128.9' in ips
assert 'honeynet' in tags
def test_malwaredomains_botnet():
rule.remote = 'test/malwaredomains/domains.zip'
x = s.process(rule, feed="botnet_domains")
x = list(x)
assert len(x) > 0
assert len(x[0].indicator) > 4
indicators = set()
for i in x:
indicators.add(i.indicator)
tags = set()
for i in x:
for t in i.tags:
tags.add(t)
assert 'botnet' in tags
assert 'attack_page' not in tags
assert '9virgins.com' in indicators
def test_malwaredomains_registrars():
rule.remote = 'test/malwaredomains/bulk_registrars.zip'
x = s.process(rule, feed="registrars")
x = list(x)
assert len(x) > 0
assert len(x[0].indicator) > 4
indicators = set()
for i in x:
indicators.add(i.indicator)
tags = set()
for i in x:
for t in i.tags:
tags.add(t)
assert 'registrar' in tags
assert 'aaa.ai' in indicators
def test_malwaredomains_urlshorteners():
rule.remote = 'test/malwaredomains/url_shorteners.zip'
x = s.process(rule, feed="url_shorteners")
x = list(x)
assert len(x) > 0
assert len(x[0].indicator) > 4
indicators = set()
for i in x:
indicators.add(i.indicator)
tags = set()
for i in x:
for t in i.tags:
tags.add(t)
assert 'service' in tags
assert 'url123.info' in indicators
def test_smrt_remote_regex():
with Smrt(None, None, client='dummy') as s:
assert type(s) is Smrt
x = []
for r, f in s.load_feeds('test/smrt/remote_regex.yml', feed='port-scanners'):
x = list(s.process(r, f))
assert len(x) > 0
x = []
for r, f in s.load_feeds('test/smrt/remote_regex.yml', feed='port-scanners-fail'):
try:
x = list(s.process(r, f))
except RuntimeError as e:
pass
assert len(x) == 0
def test_smrt_base():
with Smrt(REMOTE_ADDR, 1234, client='dummy') as s:
assert type(s) is Smrt
for r, f in s.load_feeds('test/smrt/rules'):
x = list(s.process(r, f))
assert len(x) > 0
x = []
for r, f in s.load_feeds('test/smrt/rules/csirtg.yml'):
x = list(s.process(r, f))
assert len(x) > 0
x = []
for r, f in s.load_feeds('test/smrt/rules/csirtg.yml', feed='port-scanners'):
x = list(s.process(r, f))
assert len(x) > 0
x = []
try:
r, f = next(s.load_feeds('test/smrt/rules/csirtg.yml', feed='port-scanners2'))
except KeyError:
pass
assert len(x) == 0
def test_smrt_archiver_lasttime():
tmpfile = tempfile.mktemp()
archiver = Archiver(dbfile=tmpfile)
rule = 'test/smrt/rules/archiver.yml'
feed = 'lasttime'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) > 0
f = {i.indicator: i.__dict__() for i in x}
assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) == 0
def test_smrt_archiver_firsttime():
tmpfile = tempfile.mktemp()
archiver = Archiver(dbfile=tmpfile)
rule = 'test/smrt/rules/archiver.yml'
feed = 'firsttime'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) > 0
f = {i.indicator: i.__dict__() for i in x}
assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) == 0
def test_smrt_archiver_both():
tmpfile = tempfile.mktemp()
archiver = Archiver(dbfile=tmpfile)
rule = 'test/smrt/rules/archiver.yml'
feed = 'both'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) > 0
f = {i.indicator: i.__dict__() for i in x}
assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) == 0
def test_smrt_archiver_lasttime_clear():
tmpfile = tempfile.mktemp()
archiver = Archiver(dbfile=tmpfile)
rule = 'test/smrt/rules/archiver.yml'
feed = 'lasttime'
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) > 0
f = {i.indicator: i.__dict__() for i in x}
assert f['216.243.31.2']['lasttime'] == '2016-03-23T20:22:27.000000Z'
archiver.clear_memcache()
with Smrt(REMOTE_ADDR, 1234, client='stdout', archiver=archiver) as s:
assert type(s) is Smrt
for r, f in s.load_feeds(rule, feed=feed):
x = list(s.process(r, f))
assert len(x) == 0
def test_vxvault_urls():
rule.feeds['urls']['remote'] = 'test/vxvault/feed.txt'
x = s.process(rule, feed="urls")
x = list(x)
assert len(x) > 0
urls = set()
tags = set()
for xx in x:
urls.add(xx.indicator)
tags.add(xx.tags[0])
assert 'http://jeansowghtqq.com/85.exe' in urls
assert 'malware' in tags
def test_bambenek_ipv4():
rule.feeds['c2-ipmasterlist']['remote'] = 'test/bambenek/ipv4_feed.txt'
x = s.process(rule, feed="c2-ipmasterlist")
x = list(x)
assert len(x) > 0
indicators = set()
tags = set()
for xx in x:
indicators.add(xx.indicator)
tags.add(xx.tags[0])
assert xx.lasttime
assert xx.description
assert xx.altid
assert xx.provider
assert xx.confidence
assert xx.tags
assert '141.8.225.68' in indicators
assert '185.28.193.192' in indicators
assert 'botnet' in tags
def test_abuse_ch_ransomware():
rule.feeds['ransomware']['remote'] = 'test/ransomware_abuse_ch/feed.txt'
x = s.process(rule, feed="ransomware")
x = list(x)
assert len(x) > 0
indicators = set()
tags = set()
from pprint import pprint
pprint(x)
for xx in x:
indicators.add(xx.indicator)
tags.add(xx.tags[0])
assert 'http://grandaareyoucc.asia/85.exe' in indicators
assert 'botnet' in tags
def test_sphinx_alpha_too_big(self):
route = self.newTestRoute(5)
destination = b"dest"
message = b"this is a test"
rand_reader = RandReader()
params = SphinxParams(5, 1024)
packet = SphinxPacket.forward_message(params, route, self.pki, destination, message, rand_reader)
packet = SphinxPacket(
SphinxHeader(packet.header.alpha + b"A",
packet.header.beta,
packet.header.gamma),
packet.body
)
replay_cache = PacketReplayCacheDict()
public_key, private_key = generate_node_keypair(rand_reader)
key_state = SphinxNodeKeyState(private_key)
py.test.raises(HeaderAlphaGroupMismatchError, sphinx_packet_unwrap, params, replay_cache, key_state, packet)
def matchreport(self, inamepart="",
names="pytest_runtest_logreport pytest_collectreport", when=None):
""" return a testreport whose dotted import path matches """
l = []
for rep in self.getreports(names=names):
try:
if not when and rep.when != "call" and rep.passed:
# setup/teardown passing reports - let's ignore those
continue
except AttributeError:
pass
if when and getattr(rep, 'when', None) != when:
continue
if not inamepart or inamepart in rep.nodeid.split("::"):
l.append(rep)
if not l:
raise ValueError("could not find test report matching %r: "
"no test reports at all!" % (inamepart,))
if len(l) > 1:
raise ValueError(
"found 2 or more testreports matching %r: %s" %(inamepart, l))
return l[0]
def makefile(self, ext, *args, **kwargs):
"""Create a new file in the testdir.
ext: The extension the file should use, including the dot.
E.g. ".py".
args: All args will be treated as strings and joined using
newlines. The result will be written as contents to the
file. The name of the file will be based on the test
function requesting this fixture.
E.g. "testdir.makefile('.txt', 'line1', 'line2')"
kwargs: Each keyword is the name of a file, while the value of
it will be written as contents of the file.
E.g. "testdir.makefile('.ini', pytest='[pytest]\naddopts=-rs\n')"
"""
return self._makefile(ext, args, kwargs)
def inline_runsource(self, source, *cmdlineargs):
"""Run a test module in process using ``pytest.main()``.
This run writes "source" into a temporary file and runs
``pytest.main()`` on it, returning a :py:class:`HookRecorder`
instance for the result.
:param source: The source code of the test module.
:param cmdlineargs: Any extra command line arguments to use.
:return: :py:class:`HookRecorder` instance of the result.
"""
p = self.makepyfile(source)
l = list(cmdlineargs) + [p]
return self.inline_run(*l)
def parseconfig(self, *args):
"""Return a new pytest Config instance from given commandline args.
This invokes the pytest bootstrapping code in _pytest.config
to create a new :py:class:`_pytest.core.PluginManager` and
call the pytest_cmdline_parse hook to create new
:py:class:`_pytest.config.Config` instance.
If :py:attr:`plugins` has been populated they should be plugin
modules which will be registered with the PluginManager.
"""
args = self._ensure_basetemp(args)
import _pytest.config
config = _pytest.config._prepareconfig(args, self.plugins)
# we don't know what the test will do with this half-setup config
# object and thus we make sure it gets unconfigured properly in any
# case (otherwise capturing could still be active, for example)
self.request.addfinalizer(config._ensure_unconfigure)
return config
def getitem(self, source, funcname="test_func"):
"""Return the test item for a test function.
This writes the source to a python file and runs pytest's
collection on the resulting module, returning the test item
for the requested function name.
:param source: The module source.
:param funcname: The name of the test function for which the
Item must be returned.
"""
items = self.getitems(source)
for item in items:
if item.name == funcname:
return item
assert 0, "%r item not found in module:\n%s\nitems: %s" %(
funcname, source, items)
def getmodulecol(self, source, configargs=(), withinit=False):
"""Return the module collection node for ``source``.
This writes ``source`` to a file using :py:meth:`makepyfile`
and then runs the pytest collection on it, returning the
collection node for the test module.
:param source: The source code of the module to collect.
:param configargs: Any extra arguments to pass to
:py:meth:`parseconfigure`.
:param withinit: Whether to also write a ``__init__.py`` file
to the temporarly directory to ensure it is a package.
"""
kw = {self.request.function.__name__: Source(source).strip()}
path = self.makepyfile(**kw)
if withinit:
self.makepyfile(__init__ = "#")
self.config = config = self.parseconfigure(path, *configargs)
node = self.getnode(config, path)
return node
def matchreport(self, inamepart="",
names="pytest_runtest_logreport pytest_collectreport", when=None):
""" return a testreport whose dotted import path matches """
l = []
for rep in self.getreports(names=names):
try:
if not when and rep.when != "call" and rep.passed:
# setup/teardown passing reports - let's ignore those
continue
except AttributeError:
pass
if when and getattr(rep, 'when', None) != when:
continue
if not inamepart or inamepart in rep.nodeid.split("::"):
l.append(rep)
if not l:
raise ValueError("could not find test report matching %r: "
"no test reports at all!" % (inamepart,))
if len(l) > 1:
raise ValueError(
"found 2 or more testreports matching %r: %s" %(inamepart, l))
return l[0]