def fail_if_not_removed(method):
"""Decorate a test method to track removal of deprecated code
This decorator catches :class:`~deprecation.UnsupportedWarning`
warnings that occur during testing and causes unittests to fail,
making it easier to keep track of when code should be removed.
:raises: :class:`AssertionError` if an
:class:`~deprecation.UnsupportedWarning`
is raised while running the test method.
"""
def _inner(*args, **kwargs):
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter("always")
rv = method(*args, **kwargs)
for warning in caught_warnings:
if warning.category == UnsupportedWarning:
raise AssertionError(
("%s uses a function that should be removed: %s" %
(method, str(warning.message))))
return rv
return _inner
python类simplefilter()的实例源码
def test_DeprecatedWarning_not_raised(self):
ret_val = "lololol"
class Test(object):
@deprecation.deprecated(deprecated_in="2.0",
removed_in="3.0",
current_version="1.0")
def method(self):
"""method docstring"""
return ret_val
with warnings.catch_warnings(record=True):
# If a warning is raised it'll be an exception, so we'll fail.
warnings.simplefilter("error")
sot = Test()
self.assertEqual(sot.method(), ret_val)
def cache_from_source(path, debug_override=None):
"""**DEPRECATED**
Given the path to a .py file, return the path to its .pyc file.
The .py file does not need to exist; this simply returns the path to the
.pyc file calculated as if the .py file were imported.
If debug_override is not None, then it must be a boolean and is used in
place of sys.flags.optimize.
If sys.implementation.cache_tag is None then NotImplementedError is raised.
"""
with warnings.catch_warnings():
warnings.simplefilter('ignore')
return util.cache_from_source(path, debug_override)
def test_security_dates_warning(self):
# Build an asset with an end_date
eq_end = pd.Timestamp('2012-01-01', tz='UTC')
equity_asset = Equity(1, symbol="TESTEQ", end_date=eq_end)
# Catch all warnings
with warnings.catch_warnings(record=True) as w:
# Cause all warnings to always be triggered
warnings.simplefilter("always")
equity_asset.security_start_date
equity_asset.security_end_date
equity_asset.security_name
# Verify the warning
self.assertEqual(3, len(w))
for warning in w:
self.assertTrue(issubclass(warning.category,
DeprecationWarning))
def catch_warnings():
"""Catch warnings in a with block in a list"""
# make sure deprecation warnings are active in tests
warnings.simplefilter('default', category=DeprecationWarning)
filters = warnings.filters
warnings.filters = filters[:]
old_showwarning = warnings.showwarning
log = []
def showwarning(message, category, filename, lineno, file=None, line=None):
log.append(locals())
try:
warnings.showwarning = showwarning
yield log
finally:
warnings.filters = filters
warnings.showwarning = old_showwarning
def test_warnings(self):
"""Are warnings printed at the right time?"""
# Setting up a more complicated repository
sub.call(["rm", "-rf", ".git"])
sub.call(["cp", "-R", "repo_nx.git", ".git"])
path = os.getcwd()
nx_log = gitnet.get_log(path)
# Note: Depending on the environment a warning may or may not be raised. For example, PyCharm uses UTF-8
# encoding and thus will not raised the unicode error.
with warnings.catch_warnings(record=True) as w:
# Ensure warnings are being shown
warnings.simplefilter("always")
# Trigger Warning
nx_log.detect_dup_emails()
# Check warning occurs
self.assertTrue(len(w) == 0 or len(w) == 1)
def test_undecorated_coroutine(self):
namespace = exec_test(globals(), locals(), """
class Test(AsyncTestCase):
async def test_coro(self):
pass
""")
test_class = namespace['Test']
test = test_class('test_coro')
result = unittest.TestResult()
# Silence "RuntimeWarning: coroutine 'test_coro' was never awaited".
with warnings.catch_warnings():
warnings.simplefilter('ignore')
test.run(result)
self.assertEqual(len(result.errors), 1)
self.assertIn("should be decorated", result.errors[0][1])
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def test_undecorated_coroutine(self):
namespace = exec_test(globals(), locals(), """
class Test(AsyncTestCase):
async def test_coro(self):
pass
""")
test_class = namespace['Test']
test = test_class('test_coro')
result = unittest.TestResult()
# Silence "RuntimeWarning: coroutine 'test_coro' was never awaited".
with warnings.catch_warnings():
warnings.simplefilter('ignore')
test.run(result)
self.assertEqual(len(result.errors), 1)
self.assertIn("should be decorated", result.errors[0][1])
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def test_undecorated_coroutine(self):
namespace = exec_test(globals(), locals(), """
class Test(AsyncTestCase):
async def test_coro(self):
pass
""")
test_class = namespace['Test']
test = test_class('test_coro')
result = unittest.TestResult()
# Silence "RuntimeWarning: coroutine 'test_coro' was never awaited".
with warnings.catch_warnings():
warnings.simplefilter('ignore')
test.run(result)
self.assertEqual(len(result.errors), 1)
self.assertIn("should be decorated", result.errors[0][1])
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def execute(self):
def showwarning(message, category, filename, fileno, file=None, line=None):
msg = warnings.formatwarning(message, category, filename, fileno, line)
self.log.warning(msg.strip())
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.showwarning = showwarning
try:
return self.run()
except SystemExit:
raise
except:
self.log.critical(traceback.format_exc().strip())
sys.exit(1)
def configure_logging(logging_config, logging_settings):
if not sys.warnoptions:
# Route warnings through python logging
logging.captureWarnings(True)
# RemovedInNextVersionWarning is a subclass of DeprecationWarning which
# is hidden by default, hence we force the "default" behavior
warnings.simplefilter("default", RemovedInNextVersionWarning)
if logging_config:
# First find the logging configuration function ...
logging_config_func = import_string(logging_config)
logging.config.dictConfig(DEFAULT_LOGGING)
# ... then invoke it with the logging settings
if logging_settings:
logging_config_func(logging_settings)
def catch_warnings():
"""Catch warnings in a with block in a list"""
# make sure deprecation warnings are active in tests
warnings.simplefilter('default', category=DeprecationWarning)
filters = warnings.filters
warnings.filters = filters[:]
old_showwarning = warnings.showwarning
log = []
def showwarning(message, category, filename, lineno, file=None, line=None):
log.append(locals())
try:
warnings.showwarning = showwarning
yield log
finally:
warnings.filters = filters
warnings.showwarning = old_showwarning
def guesscaption(simple=False):
output = ""
gf = guessformat()
print("raw: ")
for g in gf:
print(guessline(g, True))
print()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
for g in gf:
gl = guessline(g, simple=simple)
if not isgibber(gl):
output += gl.replace(" !", "!").replace(" ?", "?") + "\n"
return output
def __enter__(self):
if self._entered:
__tracebackhide__ = True
raise RuntimeError("Cannot enter %r twice" % self)
self._entered = True
self._filters = self._module.filters
self._module.filters = self._filters[:]
self._showwarning = self._module.showwarning
def showwarning(message, category, filename, lineno,
file=None, line=None):
self._list.append(RecordedWarning(
message, category, filename, lineno, file, line))
# still perform old showwarning functionality
self._showwarning(
message, category, filename, lineno, file=file, line=line)
self._module.showwarning = showwarning
# allow the same warning to be raised more than once
self._module.simplefilter('always')
return self
def deprecated(func_replacement):
"""
Use this decorator on functions that are marked as deprecated.
It takes a single argument of the function name it's being
replaced with.
"""
def _deprecated(func):
@functools.wraps(func)
def new_func(*args, **kwargs):
warnings.simplefilter('always', DeprecationWarning) #turn off filter
warnings.warn(
'Call to deprecated function {}. Use new function: {}() instead.'
.format(func.__name__, func_replacement),
category=DeprecationWarning, stacklevel=2)
warnings.simplefilter('default', DeprecationWarning) #reset filter
return func(*args, **kwargs)
return new_func
return _deprecated
def __enter__(self):
# let parent class archive filter state
ret = super(reset_warnings, self).__enter__()
# reset the filter to list everything
if self._reset_filter:
warnings.resetwarnings()
warnings.simplefilter(self._reset_filter)
# archive and clear the __warningregistry__ key for all modules
# that match the 'reset' pattern.
pattern = self._reset_registry
if pattern:
backup = self._orig_registry = {}
for name, mod in list(sys.modules.items()):
if mod is None or not pattern.match(name):
continue
reg = getattr(mod, "__warningregistry__", None)
if reg:
backup[name] = reg.copy()
reg.clear()
return ret
def get_credentials(flags):
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
store = Storage(flags.credfile)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.OAuth2WebServerFlow(**CLIENT_CREDENTIAL)
credentials = tools.run_flow(flow, store, flags)
print('credential file saved at\n\t' + flags.credfile)
return credentials
_test_unittest2_with.py 文件源码
项目:devsecops-example-helloworld
作者: boozallen
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def testAssertWarnsCallable(self):
def _runtime_warn():
warnings.warn("foo", RuntimeWarning)
# Success when the right warning is triggered, even several times
self.assertWarns(RuntimeWarning, _runtime_warn)
self.assertWarns(RuntimeWarning, _runtime_warn)
# A tuple of warning classes is accepted
self.assertWarns((DeprecationWarning, RuntimeWarning), _runtime_warn)
# *args and **kwargs also work
self.assertWarns(RuntimeWarning,
warnings.warn, "foo", category=RuntimeWarning)
# Failure when no warning is triggered
with self.assertRaises(self.failureException):
self.assertWarns(RuntimeWarning, lambda: 0)
# Failure when another warning is triggered
with catch_warnings():
# Force default filter (in case tests are run with -We)
warnings.simplefilter("default", RuntimeWarning)
with self.assertRaises(self.failureException):
self.assertWarns(DeprecationWarning, _runtime_warn)
# Filters for other warnings are not modified
with catch_warnings():
warnings.simplefilter("error", RuntimeWarning)
with self.assertRaises(RuntimeWarning):
self.assertWarns(DeprecationWarning, _runtime_warn)
def test_loadTestsFromModule__use_load_tests_other_bad_keyword(self):
m = types.ModuleType('m')
class MyTestCase(unittest.TestCase):
def test(self):
pass
m.testcase_1 = MyTestCase
load_tests_args = []
def load_tests(loader, tests, pattern):
self.assertIsInstance(tests, unittest.TestSuite)
load_tests_args.extend((loader, tests, pattern))
return tests
m.load_tests = load_tests
loader = unittest.TestLoader()
with warnings.catch_warnings():
warnings.simplefilter('never')
with self.assertRaises(TypeError) as cm:
loader.loadTestsFromModule(
m, use_load_tests=False, very_bad=True, worse=False)
self.assertEqual(type(cm.exception), TypeError)
# The error message names the first bad argument alphabetically,
# however use_load_tests (which sorts first) is ignored.
self.assertEqual(
str(cm.exception),
"loadTestsFromModule() got an unexpected keyword argument 'very_bad'")
def _build_program(self):
code = "\n".join(itertools.chain(self.common_header.pieces,
itertools.chain.from_iterable(cu.pieces for cu in self._compile_units)))
program = pyopencl.Program(self.context, code)
with warnings.catch_warnings():
# Intel OpenCL generates non empty output and that causes warnings
# from pyopencl. We just silence them.
warnings.simplefilter("ignore")
return program.build(options=DEFAULT_COMPILER_OPTIONS)
# Working around bug in pyopencl.Program.compile in pyopencl
# (https://lists.tiker.net/pipermail/pyopencl/2015-September/001986.html)
# TODO: Fix this in OpenCL
# compiled_units = [unit.compile(self.context, self.common_header.pieces)
# for unit in self._compile_units]
# return pyopencl.link_program(self.context, compiled_units)
def _build_program(self):
code = "\n".join(itertools.chain(self.common_header.pieces,
itertools.chain.from_iterable(cu.pieces for cu in self._compile_units)))
program = pyopencl.Program(self.context, code)
with warnings.catch_warnings():
# Intel OpenCL generates non empty output and that causes warnings
# from pyopencl. We just silence them.
warnings.simplefilter("ignore")
return program.build(options=DEFAULT_COMPILER_OPTIONS)
# Working around bug in pyopencl.Program.compile in pyopencl
# (https://lists.tiker.net/pipermail/pyopencl/2015-September/001986.html)
# TODO: Fix this in OpenCL
# compiled_units = [unit.compile(self.context, self.common_header.pieces)
# for unit in self._compile_units]
# return pyopencl.link_program(self.context, compiled_units)
def test_bool_flat_indexing_invalid_nr_elements(self, level=rlevel):
s = np.ones(10, dtype=float)
x = np.array((15,), dtype=float)
def ia(x, s, v):
x[(s > 0)] = v
# After removing deprecation, the following are ValueErrors.
# This might seem odd as compared to the value error below. This
# is due to the fact that the new code always uses "nonzero" logic
# and the boolean special case is not taken.
with warnings.catch_warnings():
warnings.simplefilter('ignore', DeprecationWarning)
warnings.simplefilter('ignore', np.VisibleDeprecationWarning)
self.assertRaises(IndexError, ia, x, s, np.zeros(9, dtype=float))
self.assertRaises(IndexError, ia, x, s, np.zeros(11, dtype=float))
# Old special case (different code path):
self.assertRaises(ValueError, ia, x.flat, s, np.zeros(9, dtype=float))
self.assertRaises(ValueError, ia, x.flat, s, np.zeros(11, dtype=float))
def test_ticket_1539(self):
dtypes = [x for x in np.typeDict.values()
if (issubclass(x, np.number)
and not issubclass(x, np.timedelta64))]
a = np.array([], dtypes[0])
failures = []
# ignore complex warnings
with warnings.catch_warnings():
warnings.simplefilter('ignore', np.ComplexWarning)
for x in dtypes:
b = a.astype(x)
for y in dtypes:
c = a.astype(y)
try:
np.dot(b, c)
except TypeError:
failures.append((x, y))
if failures:
raise AssertionError("Failures: %r" % failures)
def test_warnings(self):
# test warning code path
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
with np.errstate(all="warn"):
np.divide(1, 0.)
self.assertEqual(len(w), 1)
self.assertTrue("divide by zero" in str(w[0].message))
np.array(1e300) * np.array(1e300)
self.assertEqual(len(w), 2)
self.assertTrue("overflow" in str(w[-1].message))
np.array(np.inf) - np.array(np.inf)
self.assertEqual(len(w), 3)
self.assertTrue("invalid value" in str(w[-1].message))
np.array(1e-300) * np.array(1e-300)
self.assertEqual(len(w), 4)
self.assertTrue("underflow" in str(w[-1].message))
def test_prepend_not_one(self):
assign = self.assign
s_ = np.s_
a = np.zeros(5)
# Too large and not only ones.
assert_raises(ValueError, assign, a, s_[...], np.ones((2, 1)))
with warnings.catch_warnings():
# Will be a ValueError as well.
warnings.simplefilter("error", DeprecationWarning)
assert_raises(DeprecationWarning, assign, a, s_[[1, 2, 3],],
np.ones((2, 1)))
assert_raises(DeprecationWarning, assign, a, s_[[[1], [2]],],
np.ones((2,2,1)))
def test_closing_fid(self):
# Test that issue #1517 (too many opened files) remains closed
# It might be a "weak" test since failed to get triggered on
# e.g. Debian sid of 2012 Jul 05 but was reported to
# trigger the failure on Ubuntu 10.04:
# http://projects.scipy.org/numpy/ticket/1517#comment:2
with temppath(suffix='.npz') as tmp:
np.savez(tmp, data='LOVELY LOAD')
# We need to check if the garbage collector can properly close
# numpy npz file returned by np.load when their reference count
# goes to zero. Python 3 running in debug mode raises a
# ResourceWarning when file closing is left to the garbage
# collector, so we catch the warnings. Because ResourceWarning
# is unknown in Python < 3.x, we take the easy way out and
# catch all warnings.
with warnings.catch_warnings():
warnings.simplefilter("ignore")
for i in range(1, 1025):
try:
np.load(tmp)["data"]
except Exception as e:
msg = "Failed to load data from a file: %s" % e
raise AssertionError(msg)
def test_ddof_too_big(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
dsize = [len(d) for d in _rdat]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in range(5):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
tgt = [ddof >= d for d in dsize]
res = nf(_ndat, axis=1, ddof=ddof)
assert_equal(np.isnan(res), tgt)
if any(tgt):
assert_(len(w) == 1)
assert_(issubclass(w[0].category, RuntimeWarning))
else:
assert_(len(w) == 0)