def fail_if_not_removed(method):
"""Decorate a test method to track removal of deprecated code
This decorator catches :class:`~deprecation.UnsupportedWarning`
warnings that occur during testing and causes unittests to fail,
making it easier to keep track of when code should be removed.
:raises: :class:`AssertionError` if an
:class:`~deprecation.UnsupportedWarning`
is raised while running the test method.
"""
def _inner(*args, **kwargs):
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter("always")
rv = method(*args, **kwargs)
for warning in caught_warnings:
if warning.category == UnsupportedWarning:
raise AssertionError(
("%s uses a function that should be removed: %s" %
(method, str(warning.message))))
return rv
return _inner
python类catch_warnings()的实例源码
def test_DeprecatedWarning_not_raised(self):
ret_val = "lololol"
class Test(object):
@deprecation.deprecated(deprecated_in="2.0",
removed_in="3.0",
current_version="1.0")
def method(self):
"""method docstring"""
return ret_val
with warnings.catch_warnings(record=True):
# If a warning is raised it'll be an exception, so we'll fail.
warnings.simplefilter("error")
sot = Test()
self.assertEqual(sot.method(), ret_val)
def cache_from_source(path, debug_override=None):
"""**DEPRECATED**
Given the path to a .py file, return the path to its .pyc file.
The .py file does not need to exist; this simply returns the path to the
.pyc file calculated as if the .py file were imported.
If debug_override is not None, then it must be a boolean and is used in
place of sys.flags.optimize.
If sys.implementation.cache_tag is None then NotImplementedError is raised.
"""
with warnings.catch_warnings():
warnings.simplefilter('ignore')
return util.cache_from_source(path, debug_override)
def test_security_dates_warning(self):
# Build an asset with an end_date
eq_end = pd.Timestamp('2012-01-01', tz='UTC')
equity_asset = Equity(1, symbol="TESTEQ", end_date=eq_end)
# Catch all warnings
with warnings.catch_warnings(record=True) as w:
# Cause all warnings to always be triggered
warnings.simplefilter("always")
equity_asset.security_start_date
equity_asset.security_end_date
equity_asset.security_name
# Verify the warning
self.assertEqual(3, len(w))
for warning in w:
self.assertTrue(issubclass(warning.category,
DeprecationWarning))
def test_analogsignal_shortening_warning(self):
nio = NeuralynxIO(self.sn, use_cache='never')
with reset_warning_registry():
with warnings.catch_warnings(record=True) as w:
seg = Segment('testsegment')
nio.read_ncs(os.path.join(self.sn, 'Tet3a.ncs'), seg)
self.assertGreater(len(w), 0)
self.assertTrue(issubclass(w[0].category, UserWarning))
self.assertTrue('Analogsignalarray was shortened due to gap in'
' recorded data of file'
in str(w[0].message))
# This class is copied from
# 'http://bugs.python.org/file40031/reset_warning_registry.py' by Eli Collins
# and is related to http://bugs.python.org/issue21724 Python<3.4
def test_analogsignal_shortening_warning(self):
nio = NeuralynxIO(self.sn, use_cache='never')
with reset_warning_registry():
with warnings.catch_warnings(record=True) as w:
seg = Segment('testsegment')
nio.read_ncs(os.path.join(self.sn, 'Tet3a.ncs'), seg)
self.assertGreater(len(w), 0)
self.assertTrue(issubclass(w[0].category, UserWarning))
self.assertTrue('Analogsignalarray was shortened due to gap in'
' recorded data of file'
in str(w[0].message))
# This class is copied from
# 'http://bugs.python.org/file40031/reset_warning_registry.py' by Eli Collins
# and is related to http://bugs.python.org/issue21724 Python<3.4
test_batch_query.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_callbacks_work_multiple_times(self):
"""
Tests that multiple executions of execute on a batch statement
logs a warning, and that we don't encounter an attribute error.
@since 3.1
@jira_ticket PYTHON-445
@expected_result warning message is logged
@test_category object_mapper
"""
call_history = []
def my_callback(*args, **kwargs):
call_history.append(args)
with warnings.catch_warnings(record=True) as w:
with BatchQuery() as batch:
batch.add_callback(my_callback)
batch.execute()
batch.execute()
self.assertEqual(len(w), 2) # package filter setup to warn always
self.assertRegexpMatches(str(w[0].message), r"^Batch.*multiple.*")
test_batch_query.py 文件源码
项目:deb-python-cassandra-driver
作者: openstack
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_disable_multiple_callback_warning(self):
"""
Tests that multiple executions of a batch statement
don't log a warning when warn_multiple_exec flag is set, and
that we don't encounter an attribute error.
@since 3.1
@jira_ticket PYTHON-445
@expected_result warning message is logged
@test_category object_mapper
"""
call_history = []
def my_callback(*args, **kwargs):
call_history.append(args)
with patch('cassandra.cqlengine.query.BatchQuery.warn_multiple_exec', False):
with warnings.catch_warnings(record=True) as w:
with BatchQuery() as batch:
batch.add_callback(my_callback)
batch.execute()
batch.execute()
self.assertFalse(w)
def assert_warnings(fn, warning_msgs, regex=False):
"""Assert that each of the given warnings are emitted by fn."""
from .assertions import eq_
with warnings.catch_warnings(record=True) as log:
# ensure that nothing is going into __warningregistry__
warnings.filterwarnings("always")
result = fn()
for warning in log:
popwarn = warning_msgs.pop(0)
if regex:
assert re.match(popwarn, str(warning.message))
else:
eq_(popwarn, str(warning.message))
return result
def test_warnings(self):
"""Are warnings printed at the right time?"""
# Setting up a more complicated repository
sub.call(["rm", "-rf", ".git"])
sub.call(["cp", "-R", "repo_nx.git", ".git"])
path = os.getcwd()
nx_log = gitnet.get_log(path)
# Note: Depending on the environment a warning may or may not be raised. For example, PyCharm uses UTF-8
# encoding and thus will not raised the unicode error.
with warnings.catch_warnings(record=True) as w:
# Ensure warnings are being shown
warnings.simplefilter("always")
# Trigger Warning
nx_log.detect_dup_emails()
# Check warning occurs
self.assertTrue(len(w) == 0 or len(w) == 1)
def test_undecorated_coroutine(self):
namespace = exec_test(globals(), locals(), """
class Test(AsyncTestCase):
async def test_coro(self):
pass
""")
test_class = namespace['Test']
test = test_class('test_coro')
result = unittest.TestResult()
# Silence "RuntimeWarning: coroutine 'test_coro' was never awaited".
with warnings.catch_warnings():
warnings.simplefilter('ignore')
test.run(result)
self.assertEqual(len(result.errors), 1)
self.assertIn("should be decorated", result.errors[0][1])
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def test_undecorated_coroutine(self):
namespace = exec_test(globals(), locals(), """
class Test(AsyncTestCase):
async def test_coro(self):
pass
""")
test_class = namespace['Test']
test = test_class('test_coro')
result = unittest.TestResult()
# Silence "RuntimeWarning: coroutine 'test_coro' was never awaited".
with warnings.catch_warnings():
warnings.simplefilter('ignore')
test.run(result)
self.assertEqual(len(result.errors), 1)
self.assertIn("should be decorated", result.errors[0][1])
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def test_undecorated_coroutine(self):
namespace = exec_test(globals(), locals(), """
class Test(AsyncTestCase):
async def test_coro(self):
pass
""")
test_class = namespace['Test']
test = test_class('test_coro')
result = unittest.TestResult()
# Silence "RuntimeWarning: coroutine 'test_coro' was never awaited".
with warnings.catch_warnings():
warnings.simplefilter('ignore')
test.run(result)
self.assertEqual(len(result.errors), 1)
self.assertIn("should be decorated", result.errors[0][1])
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def test_callbacks_work_multiple_times(self):
"""
Tests that multiple executions of execute on a batch statement
logs a warning, and that we don't encounter an attribute error.
@since 3.1
@jira_ticket PYTHON-445
@expected_result warning message is logged
@test_category object_mapper
"""
call_history = []
def my_callback(*args, **kwargs):
call_history.append(args)
with warnings.catch_warnings(record=True) as w:
batch = Batch(self.conn)
batch.add_callback(my_callback)
batch.execute_batch()
batch.execute_batch()
self.assertEqual(len(w), 1)
self.assertRegexpMatches(str(w[0].message), r"^Batch.*multiple.*")
def execute(self):
def showwarning(message, category, filename, fileno, file=None, line=None):
msg = warnings.formatwarning(message, category, filename, fileno, line)
self.log.warning(msg.strip())
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.showwarning = showwarning
try:
return self.run()
except SystemExit:
raise
except:
self.log.critical(traceback.format_exc().strip())
sys.exit(1)
def guesscaption(simple=False):
output = ""
gf = guessformat()
print("raw: ")
for g in gf:
print(guessline(g, True))
print()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
for g in gf:
gl = guessline(g, simple=simple)
if not isgibber(gl):
output += gl.replace(" !", "!").replace(" ?", "?") + "\n"
return output
def test_backwards_compat(self):
with warnings.catch_warnings(record=True) as w:
class NewProperty(properties.Property):
info_text = 'new property'
def info(self):
return self.info_text
assert len(w) == 2
assert issubclass(w[0].category, FutureWarning)
np = NewProperty('')
assert getattr(np, 'class_info', None) == 'new property'
assert getattr(np, 'info', None) == 'new property'
def test_norm_hash_name(self):
"""norm_hash_name()"""
from itertools import chain
from passlib.crypto.digest import norm_hash_name, _known_hash_names
# snapshot warning state, ignore unknown hash warnings
ctx = warnings.catch_warnings()
ctx.__enter__()
self.addCleanup(ctx.__exit__)
warnings.filterwarnings("ignore", '.*unknown hash')
# test string types
self.assertEqual(norm_hash_name(u("MD4")), "md4")
self.assertEqual(norm_hash_name(b"MD4"), "md4")
self.assertRaises(TypeError, norm_hash_name, None)
# test selected results
for row in chain(_known_hash_names, self.norm_hash_samples):
for idx, format in enumerate(self.norm_hash_formats):
correct = row[idx]
for value in row:
result = norm_hash_name(value, format)
self.assertEqual(result, correct,
"name=%r, format=%r:" % (value,
format))
def test_bad_val_in_column_descriptions():
np.random.seed(0)
df_titanic_train, df_titanic_test = utils.get_titanic_binary_classification_dataset()
column_descriptions = {
'survived': 'output'
, 'embarked': 'categorical'
, 'pclass': 'categorical'
, 'fare': 'this_is_a_bad_value'
}
with warnings.catch_warnings(record=True) as w:
ml_predictor = Predictor(type_of_estimator='classifier', column_descriptions=column_descriptions)
print('we should be throwing a warning for the user to give them useful feedback')
assert len(w) == 1
def test_throws_warning_when_fl_data_equals_df_train():
df_titanic_train, df_titanic_test = utils.get_titanic_binary_classification_dataset()
column_descriptions = {
'survived': 'output'
, 'embarked': 'categorical'
, 'pclass': 'categorical'
}
ml_predictor = Predictor(type_of_estimator='classifier', column_descriptions=column_descriptions)
with warnings.catch_warnings(record=True) as w:
try:
ml_predictor.train(df_titanic_train, feature_learning=True, fl_data=df_titanic_train)
except KeyError as e:
pass
# We should not be getting to this line- we should be throwing an error above
for thing in w:
print(thing)
assert len(w) == 1
def test_remove_missing():
df = pd.DataFrame({'a': [1.0, np.NaN, 3, np.inf],
'b': [1, 2, 3, 4]})
df2 = pd.DataFrame({'a': [1.0, 3, np.inf],
'b': [1, 3, 4]})
df3 = pd.DataFrame({'a': [1.0, 3],
'b': [1, 3]})
with warnings.catch_warnings(record=True) as w:
res = remove_missing(df, na_rm=True, vars=['b'])
res.equals(df)
res = remove_missing(df)
res.equals(df2)
res = remove_missing(df, na_rm=True, finite=True)
res.equals(df3)
assert len(w) == 1
def get_credentials(flags):
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
store = Storage(flags.credfile)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.OAuth2WebServerFlow(**CLIENT_CREDENTIAL)
credentials = tools.run_flow(flow, store, flags)
print('credential file saved at\n\t' + flags.credfile)
return credentials
def test_loadTestsFromModule__load_tests(self):
m = types.ModuleType('m')
class MyTestCase(unittest2.TestCase):
def test(self):
pass
m.testcase_1 = MyTestCase
load_tests_args = []
def load_tests(loader, tests, pattern):
self.assertIsInstance(tests, unittest2.TestSuite)
load_tests_args.extend((loader, tests, pattern))
return tests
m.load_tests = load_tests
loader = unittest2.TestLoader()
suite = loader.loadTestsFromModule(m)
self.assertIsInstance(suite, unittest2.TestSuite)
self.assertEqual(load_tests_args, [loader, suite, None])
# With Python 3.5, the undocumented and unofficial use_load_tests is
# ignored (and deprecated).
load_tests_args = []
with warnings.catch_warnings(record=False):
warnings.simplefilter('never')
suite = loader.loadTestsFromModule(m, use_load_tests=False)
self.assertEqual(load_tests_args, [loader, suite, None])
def test_loadTestsFromModule__use_load_tests_other_bad_keyword(self):
m = types.ModuleType('m')
class MyTestCase(unittest.TestCase):
def test(self):
pass
m.testcase_1 = MyTestCase
load_tests_args = []
def load_tests(loader, tests, pattern):
self.assertIsInstance(tests, unittest.TestSuite)
load_tests_args.extend((loader, tests, pattern))
return tests
m.load_tests = load_tests
loader = unittest.TestLoader()
with warnings.catch_warnings():
warnings.simplefilter('never')
with self.assertRaises(TypeError) as cm:
loader.loadTestsFromModule(
m, use_load_tests=False, very_bad=True, worse=False)
self.assertEqual(type(cm.exception), TypeError)
# The error message names the first bad argument alphabetically,
# however use_load_tests (which sorts first) is ignored.
self.assertEqual(
str(cm.exception),
"loadTestsFromModule() got an unexpected keyword argument 'very_bad'")
def _build_program(self):
code = "\n".join(itertools.chain(self.common_header.pieces,
itertools.chain.from_iterable(cu.pieces for cu in self._compile_units)))
program = pyopencl.Program(self.context, code)
with warnings.catch_warnings():
# Intel OpenCL generates non empty output and that causes warnings
# from pyopencl. We just silence them.
warnings.simplefilter("ignore")
return program.build(options=DEFAULT_COMPILER_OPTIONS)
# Working around bug in pyopencl.Program.compile in pyopencl
# (https://lists.tiker.net/pipermail/pyopencl/2015-September/001986.html)
# TODO: Fix this in OpenCL
# compiled_units = [unit.compile(self.context, self.common_header.pieces)
# for unit in self._compile_units]
# return pyopencl.link_program(self.context, compiled_units)
def _build_program(self):
code = "\n".join(itertools.chain(self.common_header.pieces,
itertools.chain.from_iterable(cu.pieces for cu in self._compile_units)))
program = pyopencl.Program(self.context, code)
with warnings.catch_warnings():
# Intel OpenCL generates non empty output and that causes warnings
# from pyopencl. We just silence them.
warnings.simplefilter("ignore")
return program.build(options=DEFAULT_COMPILER_OPTIONS)
# Working around bug in pyopencl.Program.compile in pyopencl
# (https://lists.tiker.net/pipermail/pyopencl/2015-September/001986.html)
# TODO: Fix this in OpenCL
# compiled_units = [unit.compile(self.context, self.common_header.pieces)
# for unit in self._compile_units]
# return pyopencl.link_program(self.context, compiled_units)
def test_pipe_handle(self):
h, _ = windows_utils.pipe(overlapped=(True, True))
_winapi.CloseHandle(_)
p = windows_utils.PipeHandle(h)
self.assertEqual(p.fileno(), h)
self.assertEqual(p.handle, h)
# check garbage collection of p closes handle
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "", ResourceWarning)
del p
support.gc_collect()
try:
_winapi.CloseHandle(h)
except OSError as e:
self.assertEqual(e.winerror, 6) # ERROR_INVALID_HANDLE
else:
raise RuntimeError('expected ERROR_INVALID_HANDLE')